You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by dv...@apache.org on 2011/12/22 23:14:07 UTC
svn commit: r1222488 [2/2] - in /pig/trunk: ./ test/org/apache/pig/test/
Modified: pig/trunk/test/org/apache/pig/test/TestPruneColumn.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestPruneColumn.java?rev=1222488&r1=1222487&r2=1222488&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestPruneColumn.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestPruneColumn.java Thu Dec 22 22:14:06 2011
@@ -41,16 +41,12 @@ import org.apache.log4j.FileAppender;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.SimpleLayout;
-import org.apache.pig.ExecType;
import org.apache.pig.FilterFunc;
import org.apache.pig.LoadFunc;
import org.apache.pig.LoadPushDown;
import org.apache.pig.PigServer;
-import org.apache.pig.LoadPushDown.RequiredFieldList;
-import org.apache.pig.LoadPushDown.RequiredFieldResponse;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigTextInputFormat;
-import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.io.FileLocalizer;
@@ -89,13 +85,13 @@ public class TestPruneColumn extends Tes
}
static public class MyFilterFunc extends FilterFunc {
-
+
@Override
public Boolean exec(Tuple input) {
return true;
}
}
-
+
@Before
@Override
public void setUp() throws Exception{
@@ -106,15 +102,14 @@ public class TestPruneColumn extends Tes
logFile = File.createTempFile("log", "");
FileAppender appender = new FileAppender(layout, logFile.toString(), false, false, 0);
logger.addAppender(appender);
-
+
pigServer = new PigServer("local");
- //pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
tmpFile1 = File.createTempFile("prune", "txt");
PrintStream ps = new PrintStream(new FileOutputStream(tmpFile1));
ps.println("1\t2\t3");
ps.println("2\t5\t2");
ps.close();
-
+
tmpFile2 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile2));
ps.println("1\t1");
@@ -132,13 +127,13 @@ public class TestPruneColumn extends Tes
ps.println("1\t2\t3");
ps.println("1\t2\t3");
ps.close();
-
+
tmpFile5 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile5));
ps.println("1\t2\t3\t4");
ps.println("2\t3\t4\t5");
ps.close();
-
+
tmpFile6 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile6));
ps.println("\t2\t3");
@@ -150,13 +145,13 @@ public class TestPruneColumn extends Tes
ps.println("1\t1\t1");
ps.println("2\t2\t2");
ps.close();
-
+
tmpFile8 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile8));
ps.println("1\t2\t3\t4");
ps.println("2\t5\t2\t3");
ps.close();
-
+
tmpFile9 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile9));
ps.println("1\t[key1#1,key2#2]\t[key3#8,key4#9]");
@@ -174,14 +169,14 @@ public class TestPruneColumn extends Tes
ps.println("1\t3\t2");
ps.println("2\t5\t2");
ps.close();
-
+
tmpFile12 = File.createTempFile("prune", "txt");
ps = new PrintStream(new FileOutputStream(tmpFile12));
ps.println("[key1#1,key2#2,cond#1]");
ps.println("[key1#2,key2#3,cond#1]");
ps.close();
}
-
+
@After
@Override
public void tearDown() throws Exception{
@@ -199,11 +194,11 @@ public class TestPruneColumn extends Tes
tmpFile12.delete();
logFile.delete();
}
-
+
public boolean checkLogFileMessage(String[] messages)
{
BufferedReader reader = null;
-
+
try {
reader = new BufferedReader(new FileReader(logFile));
List<String> logMessages=new ArrayList<String>();
@@ -212,7 +207,7 @@ public class TestPruneColumn extends Tes
{
logMessages.add(line);
}
-
+
// Check if all messages appear in the log
for (int i=0;i<messages.length;i++)
{
@@ -225,7 +220,7 @@ public class TestPruneColumn extends Tes
if (!found)
return false;
}
-
+
// Check no other log besides messages
for (int i=0;i<logMessages.size();i++) {
boolean found = false;
@@ -248,7 +243,7 @@ public class TestPruneColumn extends Tes
return false;
}
}
-
+
public boolean emptyLogFileMessage()
{
if (!logFile.exists())
@@ -274,79 +269,79 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 2);
assertTrue((Integer)t.get(1) == 3);
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 5);
assertTrue((Integer)t.get(1) == 2);
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
-
+
@Test
public void testLoadForEach2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 1);
assertTrue((Integer)t.get(1) == 3);
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 2);
assertTrue((Integer)t.get(1) == 2);
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
-
+
@Test
public void testLoadForEach3() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1;");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 1);
assertTrue((Integer)t.get(1) == 2);
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue((Integer)t.get(0) == 2);
assertTrue((Integer)t.get(1) == 5);
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
-
+
@Test
public void testJoin1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b0, b1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==4);
@@ -354,84 +349,84 @@ public class TestPruneColumn extends Tes
assertTrue(t.get(1).equals(3));
assertTrue(t.get(2).equals(2));
assertTrue(t.get(3).equals(2));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
-
+
@Test
public void testJoin2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = join A by a1, B by b1;");
pigServer.registerQuery("D = foreach C generate a1, a2, b1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==3);
assertTrue(t.get(0).equals(2));
assertTrue(t.get(1).equals(3));
assertTrue(t.get(2).equals(2));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Columns pruned for B: $0"}));
}
-
+
@Test
public void testForEachFilter() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate a0, a1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).equals(1));
assertTrue(t.get(1).equals(2));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testForEach1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0, a1+a2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).equals(1));
assertTrue(t.get(1).equals(5));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).equals(2));
assertTrue(t.get(1).equals(7));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testForEach2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = foreach A generate a0 as b0, *;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==4);
@@ -439,7 +434,7 @@ public class TestPruneColumn extends Tes
assertTrue(t.get(1).equals(1));
assertTrue(t.get(2).equals(2));
assertTrue(t.get(3).equals(3));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==4);
@@ -447,94 +442,94 @@ public class TestPruneColumn extends Tes
assertTrue(t.get(1).equals(2));
assertTrue(t.get(2).equals(5));
assertTrue(t.get(3).equals(2));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testSplit1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
-
+
@Test
public void testSplit2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("split A into B if $0<=1, C if $0>1;");
pigServer.registerQuery("D = foreach B generate $1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
-
+
@Test
public void testForeachNoSchema1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("3"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("5"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testForeachNoSchema2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "';");
pigServer.registerQuery("B = foreach A generate $1, 'aoeuaoeu';");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("aoeuaoeu"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("5"));
assertTrue(t.get(1).toString().equals("aoeuaoeu"));
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCoGroup1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
@@ -542,72 +537,72 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = cogroup A by $1, B by $1;");
pigServer.registerQuery("D = foreach C generate AVG($1.$1);");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0)==null);
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2.0"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("5.0"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
-
+
@Test
public void testCoGroup2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A all;");
pigServer.registerQuery("C = foreach B generate $1;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
String[] expected = new String[] {
"({(1,2,3),(2,5,2)})"
};
assertTrue(iter.hasNext());
- Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
-
+ Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCoGroup3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = group A by $1;");
pigServer.registerQuery("C = foreach B generate $1, '1';");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("{(1,2,3)}"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("{(2,5,2)}"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCoGroup4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
@@ -615,125 +610,125 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $1.$1, $2.$1;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("{}"));
assertTrue(t.get(1).toString().equals("{(1)}"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("{(2)}"));
assertTrue(t.get(1).toString().equals("{(2)}"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("{(5)}"));
assertTrue(t.get(1).toString().equals("{}"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCoGroup5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by (a0, a1);");
pigServer.registerQuery("C = foreach B generate flatten(group);");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("5"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
-
+
@Test
public void testDistinct1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile4.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = distinct A;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testStream1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = stream A through `" + simpleEchoStreamingCommand + "`;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testBinCond1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile5.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2, a3);");
pigServer.registerQuery("B = foreach A generate ($1 == '2'? $2 : $3);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("3"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("5"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
-
+
@Test
public void testCoGroup6() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -741,28 +736,28 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate A, flatten(B.($0, $1));");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==3);
assertTrue(t.get(0).toString().equals("{}"));
assertTrue(t.get(1).toString().equals("1"));
assertTrue(t.get(2).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==3);
assertTrue(t.get(0).toString().equals("{(1,2,3)}"));
assertTrue(t.get(1).toString().equals("2"));
assertTrue(t.get(2).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCoGroup7() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -770,30 +765,30 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C {B = order B by $0;generate FLATTEN(A), B.($1);};");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==4);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("2"));
assertTrue(t.get(2).toString().equals("3"));
assertTrue(t.get(3).toString().equals("{(2)}"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==4);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("5"));
assertTrue(t.get(2).toString().equals("2"));
assertTrue(t.get(3).toString().equals("{}"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCross1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -801,43 +796,43 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
Collection<String> results = new HashSet<String>();
results.add("(1,1)");
results.add("(2,1)");
results.add("(1,2)");
results.add("(2,2)");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(results.contains(t.toString()));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(results.contains(t.toString()));
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(results.contains(t.toString()));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
-
+
@Test
public void testUnion1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -845,37 +840,37 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = union A, B;");
pigServer.registerQuery("D = foreach C generate $0, $2;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
Collection<String> results = new HashSet<String>();
results.add("(1,3)");
results.add("(2,2)");
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
results.contains(t.toString());
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
results.contains(t.toString());
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
results.contains(t.toString());
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
results.contains(t.toString());
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
"Columns pruned for B: $1"}));
}
@@ -886,27 +881,27 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = join A by $0, B by $0 using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0, $3;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("2"));
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
-
+
@Test
public void testFilter1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -914,120 +909,120 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = limit B 10;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $2"}));
}
-
+
@Test
public void testFilter2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by a0+a2 == 4;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
-
+
@Test
public void testOrderBy1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
-
+
@Test
public void testOrderBy2() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testCogroup8() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = group A by *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("(1,2,3)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("(2,5,2)"));
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testJoin3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -1035,72 +1030,72 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = join A by *, B by * using 'replicated';");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testLoadForEach4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate *;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
-
+
@Test
public void testForEachUDF() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
pigServer.registerQuery("B = foreach A generate StringSize(*);");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testOutJoin1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile6.toString(), pigServer.getPigContext()) + "' AS (a0:chararray, a1:chararray, a2:chararray);");
@@ -1108,50 +1103,50 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = join A by $0 left, B by $0;");
pigServer.registerQuery("D = foreach C generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
Collection<String> results = new HashSet<String>();
results.add("(2)");
results.add("()");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(results.contains(t.toString()));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(results.contains(t.toString()));
-
+
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1, $2"}));
}
-
+
@Test
public void testFilter3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = filter A by " + MyFilterFunc.class.getName() + "(*) ;");
pigServer.registerQuery("C = foreach B generate $0;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
@@ -1159,139 +1154,139 @@ public class TestPruneColumn extends Tes
public void testMapKey1() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a0, a1#'key1';");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).equals(1));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).equals(2));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
-
+
@Test
public void testMapKey2() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = foreach B generate $0#'key2', $1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("4"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key2, key1]"}));
}
-
+
@Test
public void testMapKey3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate a1, a1#'key1';");
pigServer.registerQuery("C = group B all;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
String[] expected = new String[] {
"(all,{([key2#2,key1#1],1),([key2#4,key1#2],2)})"
};
-
+
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("C")));
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0"}));
}
-
+
@Test
public void testMapKey4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = limit A 10;");
pigServer.registerQuery("C = foreach B generate $0, $1#'key1';");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
-
+
@Test
public void testMapKey5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:map[]);");
pigServer.registerQuery("B = foreach A generate $0, $1#'key1';");
pigServer.registerQuery("C = stream B through `" + simpleEchoStreamingCommand + "`;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $1->[key1]"}));
}
-
+
@Test
public void testMapKeyInSplit1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile12.toString(), pigServer.getPigContext()) + "' as (m:map[]);");
pigServer.registerQuery("B = foreach A generate m#'key1' as key1;");
pigServer.registerQuery("C = foreach A generate m#'key2' as key2;");
pigServer.registerQuery("D = join B by key1, C by key2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Map key required for A: $0->[key2, key1]"}));
}
-
+
@SuppressWarnings("rawtypes")
@Test
public void testMapKeyInSplit2() throws Exception {
@@ -1300,9 +1295,9 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("C = filter B by m#'key1'==1;");
pigServer.registerQuery("D = filter B by m#'key2'==2;");
pigServer.registerQuery("E = join C by m#'key1', D by m#'key1';");
-
+
Iterator<Tuple> iter = pigServer.openIterator("E");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
@@ -1312,62 +1307,62 @@ public class TestPruneColumn extends Tes
assertTrue(((Map)t.get(1)).get("key1").toString().equals("1"));
assertTrue(((Map)t.get(1)).get("key2").toString().equals("2"));
assertTrue(((Map)t.get(1)).get("cond").toString().equals("1"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(this.emptyLogFileMessage());
}
-
+
@Test
public void testConstantPlan() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate 1, a2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("3"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
-
+
@Test
public void testPlainPlan() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.registerQuery("B = order A by $0;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==3);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("2"));
assertTrue(t.get(2).toString().equals("3"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==3);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("5"));
assertTrue(t.get(2).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testBinStorage1() throws Exception {
// get a temp intermediate filename
@@ -1375,29 +1370,29 @@ public class TestPruneColumn extends Tes
intermediateFile.delete(); // delete since we don't want the file to be present
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", intermediateFile.toString(), "BinStorage()");
-
- pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+
+ pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ "' using BinStorage() as (a0, a1, a2);");
-
+
pigServer.registerQuery("B = foreach A generate a0;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2"}));
}
-
+
@Test
public void testBinStorage2() throws Exception {
// get a temp intermediate filename
@@ -1405,102 +1400,102 @@ public class TestPruneColumn extends Tes
intermediateFile.delete(); // delete since we don't want the file to be present
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
pigServer.store("A", intermediateFile.toString(), "BinStorage()");
-
- pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+
+ pigServer.registerQuery("A = load '"+ intermediateFile.toString()
+ "' using BinStorage() as (a0, a1, a2);");
-
+
pigServer.registerQuery("B = foreach A generate a2, a0, a1;");
pigServer.registerQuery("C = foreach B generate a0, a2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("3"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
-
+
}
-
+
@Test
public void testProjectCastKeyLookup() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1);");
-
+
pigServer.registerQuery("B = foreach A generate a1#'key1';");
-
+
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key1]"}));
}
-
+
@Test
public void testRelayFlattenMap() throws Exception {
- pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile3.toString(), pigServer.getPigContext())
+ "' as (a0, a1:map[]);");
-
+
pigServer.registerQuery("B = foreach A generate flatten(a1);");
pigServer.registerQuery("C = foreach B generate a1#'key1';");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("1"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("2"));
-
+
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0",
"Map key required for A: $1->[key1]"}));
}
-
+
@Test
public void testCrossAtLeastOneColumnOneInput() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' as (a0:int, a1:int, a2:int);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' as (b0:int, b1:int);");
pigServer.registerQuery("C = cross A, B;");
pigServer.registerQuery("D = foreach C generate $0;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1)");
results.add("(2)");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(results.contains(t.toString()));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==1);
@@ -1517,11 +1512,11 @@ public class TestPruneColumn extends Tes
assertTrue(results.contains(t.toString()));
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1, $2",
"Columns pruned for B: $1"}));
}
-
+
@Test
public void testComplex1() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile7.toString(), pigServer.getPigContext()) + "' as (a0, a1, a2);");
@@ -1533,71 +1528,71 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("F = filter E by $1<10;");
pigServer.registerQuery("G = group F by $0;");
pigServer.registerQuery("H = foreach G generate $1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("H");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==1);
assertTrue(t.get(0).toString().equals("{(2,2)}"));
-
+
assertFalse(iter.hasNext());
-
- assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
+
+ assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1",
"Columns pruned for B: $1"}));
}
-
+
@Test
public void testCoGroup8() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1:int, a2);");
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (b0, b1:int);");
pigServer.registerQuery("C = cogroup A by ($1), B by ($1);");
pigServer.registerQuery("D = foreach C generate $0, $1;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("1"));
assertTrue(t.get(1).toString().equals("{}"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("2"));
assertTrue(t.get(1).toString().equals("{(1,2,3)}"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.size()==2);
assertTrue(t.get(0).toString().equals("5"));
assertTrue(t.get(1).toString().equals("{(2,5,2)}"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0"}));
}
-
+
// See PIG-1128
@Test
public void testUserDefinedSchema() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS ( c1 : chararray, c2 : int);");
pigServer.registerQuery("B = foreach A generate c1 as c1 : chararray, c2 as c2 : int, 'CA' as state : chararray;");
pigServer.registerQuery("C = foreach B generate c1 as c1 : chararray;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(1)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.toString().equals("(2)"));
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
@@ -1607,18 +1602,18 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile10.toString(), pigServer.getPigContext()) + "' AS (a0, a1:map[], a2);");
pigServer.registerQuery("B = foreach A generate a1;");
pigServer.registerQuery("C = limit B 10;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("([2#1,1#1])"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2"}));
}
-
+
// See PIG-1142
@Test
public void testJoin4() throws Exception {
@@ -1626,83 +1621,83 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("B = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (b0, b1, b2);");
pigServer.registerQuery("C = join A by a2, B by b2;");
pigServer.registerQuery("D = foreach C generate $0, $1, $2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("D");
Collection<String> results = new HashSet<String>();
results.add("(1,2,3)");
results.add("(2,5,2)");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(results.contains(t.toString()));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(results.contains(t.toString()));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for B: $0, $1"}));
}
-
+
@Test
public void testFilter4() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("B = filter A by a2==3;");
pigServer.registerQuery("C = foreach B generate $2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(3)"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
-
+
@Test
public void testSplit3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2:int);");
pigServer.registerQuery("split A into B if a2==3, C if a2<3;");
pigServer.registerQuery("C = foreach B generate $2;");
-
+
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(3)"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
-
+
@Test
public void testOrderBy3() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
pigServer.registerQuery("B = order A by a2;");
pigServer.registerQuery("C = foreach B generate a2;");
Iterator<Tuple> iter = pigServer.openIterator("C");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.toString().equals("(2)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==1);
assertTrue(t.toString().equals("(3)"));
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $1"}));
}
-
+
@Test
public void testCogroup9() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' AS (a0, a1, a2);");
@@ -1711,21 +1706,21 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("D = cogroup A by a2, B by b2, C by c2;");
pigServer.registerQuery("E = foreach D generate $1, $2;");
Iterator<Tuple> iter = pigServer.openIterator("E");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.toString().equals("({(2,5,2)},{(2,5,2)})"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.toString().equals("({(1,2,3)},{(1,2,3)})"));
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for C: $0, $1"}));
}
@@ -1738,18 +1733,18 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("D = join C by a1, B by b0;");
pigServer.registerQuery("E = foreach D generate a1, b0, b1;");
Iterator<Tuple> iter = pigServer.openIterator("E");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==3);
assertTrue(t.toString().equals("(2,2,2)"));
-
+
assertFalse(iter.hasNext());
-
+
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $0, $2"}));
}
-
+
// See PIG-1146
@Test
public void testUnionMixedPruning() throws Exception {
@@ -1793,7 +1788,7 @@ public class TestPruneColumn extends Tes
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
-
+
// See PIG-1176
@Test
public void testUnionMixedSchemaPruning() throws Exception {
@@ -1837,12 +1832,12 @@ public class TestPruneColumn extends Tes
assertTrue(emptyLogFileMessage());
}
-
+
// See PIG-1184
@Test
public void testForEachFlatten() throws Exception {
File inputFile = Util.createInputFile("table_testForEachFlatten", "", new String[]{"oiue\tM\t{(3),(4)}\t{(toronto),(montreal)}"});
-
+
pigServer.registerQuery("A = load '"+inputFile.toString()+"' as (a0:chararray, a1:chararray, a2:bag{t:tuple(id:chararray)}, a3:bag{t:tuple(loc:chararray)});");
pigServer.registerQuery("B = foreach A generate a0, a1, flatten(a2), flatten(a3), 10;");
pigServer.registerQuery("C = foreach B generate a0, $4;");
@@ -1868,7 +1863,7 @@ public class TestPruneColumn extends Tes
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
-
+
// See PIG-1210
@Test
public void testFieldsToReadDuplicatedEntry() throws Exception {
@@ -1879,7 +1874,7 @@ public class TestPruneColumn extends Tes
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(2.0,2,3)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.toString().equals("(4.0,5,2)"));
@@ -1888,7 +1883,7 @@ public class TestPruneColumn extends Tes
assertTrue(emptyLogFileMessage());
}
-
+
// See PIG-1272
@Test
public void testSplit4() throws Exception {
@@ -1900,14 +1895,14 @@ public class TestPruneColumn extends Tes
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(1,2,3,1)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.toString().equals("(2,5,2,2)"));
assertTrue(emptyLogFileMessage());
}
-
+
@Test
public void testSplit5() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile11.toString(), pigServer.getPigContext()) + "' AS (a0:int, a1:int, a2:int);");
@@ -1922,9 +1917,9 @@ public class TestPruneColumn extends Tes
"(1,3,2,1,3)",
"(2,5,2,2,5)"
};
-
+
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("D")));
-
+
assertTrue(emptyLogFileMessage());
}
@@ -1940,14 +1935,14 @@ public class TestPruneColumn extends Tes
assertTrue(iter.hasNext());
Tuple t = iter.next();
assertTrue(t.toString().equals("(1,3)"));
-
+
assertTrue(iter.hasNext());
t = iter.next();
assertTrue(t.toString().equals("(2,2)"));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
}
-
+
// See PIG-1644
@Test
public void testSplitOutputWithForEach() throws Exception {
@@ -1965,27 +1960,27 @@ public class TestPruneColumn extends Tes
BufferedReader reader1 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output1.toString(), pigServer.getPigContext().getProperties())));
String line = reader1.readLine();
assertTrue(line.equals("1\t2\t3"));
-
+
line = reader1.readLine();
assertTrue(line.equals("2\t3\t4"));
-
+
assertTrue(reader1.readLine()==null);
-
+
BufferedReader reader2 = new BufferedReader(new InputStreamReader(FileLocalizer.openDFSFile(output2.toString(), pigServer.getPigContext().getProperties())));
line = reader2.readLine();
assertTrue(line.equals("3"));
-
+
line = reader2.readLine();
assertTrue(line.equals("4"));
-
+
assertTrue(reader2.readLine()==null);
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $3"}));
-
+
reader1.close();
reader2.close();
}
-
+
static public class PruneColumnEvalFunc extends LoadFunc implements LoadPushDown {
String[] aliases;
String signature;
@@ -2024,7 +2019,7 @@ public class TestPruneColumn extends Tes
public void prepareToRead(RecordReader reader, PigSplit split)
throws IOException {
}
-
+
@Override
public void setUDFContextSignature(String signature) {
this.signature = signature;
@@ -2042,23 +2037,23 @@ public class TestPruneColumn extends Tes
return null;
}
}
-
+
public void testAliasInRequiredFieldList() throws Exception{
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile1.toString(), pigServer.getPigContext()) + "' using "
+ PruneColumnEvalFunc.class.getName() +"() as (a0, a1, a2);");
pigServer.registerQuery("B = foreach A generate a1, a2;");
Iterator<Tuple> iter = pigServer.openIterator("B");
-
+
assertTrue(iter.hasNext());
Tuple t = iter.next();
-
+
assertTrue(t.size()==2);
assertTrue(t.get(0).equals("a1"));
assertTrue(t.get(1).equals("a2"));
-
+
assertFalse(iter.hasNext());
}
-
+
@Test
public void testCogroup10() throws Exception {
pigServer.registerQuery("A = load '"+ Util.generateURI(tmpFile2.toString(), pigServer.getPigContext()) + "' AS (a0, a1:double);");
@@ -2068,9 +2063,9 @@ public class TestPruneColumn extends Tes
pigServer.registerQuery("E = join B by joinField, D by joinField;");
pigServer.registerQuery("F = foreach E generate a0;");
Iterator<Tuple> iter = pigServer.openIterator("F");
-
+
String[] expected = new String[] {"(1)", "(2)"};
-
+
Util.checkQueryOutputsAfterSortRecursive(iter, expected, org.apache.pig.newplan.logical.Util.translateSchema(pigServer.dumpSchema("F")));
assertTrue(checkLogFileMessage(new String[]{"Columns pruned for A: $1"}));
Modified: pig/trunk/test/org/apache/pig/test/TestUTF8.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUTF8.java?rev=1222488&r1=1222487&r2=1222488&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUTF8.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUTF8.java Thu Dec 22 22:14:06 2011
@@ -26,7 +26,6 @@ import java.io.PrintWriter;
import java.io.StringWriter;
import java.util.Iterator;
-import org.junit.AfterClass;
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -42,24 +41,17 @@ import org.apache.pig.tools.parameters.P
import junit.framework.TestCase;
@RunWith(JUnit4.class)
public class TestUTF8 extends TestCase {
-
- static MiniCluster cluster = MiniCluster.buildCluster();
private PigServer pigServer;
@Before
@Override
public void setUp() throws Exception{
- pigServer = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
+ pigServer = new PigServer(ExecType.LOCAL, System.getProperties());
}
-
- @AfterClass
- public static void oneTimeTearDown() throws Exception {
- cluster.shutDown();
- }
-
+
@Test
public void testPigStorage() throws Exception{
-
+
File f1 = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f1, "UTF-8");
pw.println("ä¸æ");
@@ -68,19 +60,19 @@ public class TestUTF8 extends TestCase {
pw.println("ภาษาà¹à¸à¸¢");
pw.close();
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ "' using " + PigStorage.class.getName() + "();");
Iterator<Tuple> iter = pigServer.openIterator("a");
-
+
assertEquals(DataType.toString(iter.next().get(0)), "ä¸æ");
assertEquals(DataType.toString(iter.next().get(0)), "ã«ã»ãã");
assertEquals(DataType.toString(iter.next().get(0)), "íêµì´");
assertEquals(DataType.toString(iter.next().get(0)), "ภาษาà¹à¸à¸¢");
-
+
f1.delete();
}
-
+
@Test
public void testScriptParser() throws Throwable {
@@ -93,42 +85,42 @@ public class TestUTF8 extends TestCase {
grunt.exec();
}
-
+
@Test
public void testQueryParser() throws Exception{
File f1 = File.createTempFile("tmp", "");
PrintWriter pw = new PrintWriter(f1, "UTF-8");
pw.println("ä¸æ");
pw.close();
-
- pigServer.registerQuery("a = load '"
- + Util.generateURI(f1.toString(), pigServer.getPigContext())
+
+ pigServer.registerQuery("a = load '"
+ + Util.generateURI(f1.toString(), pigServer.getPigContext())
+ "' using " + PigStorage.class.getName() + "();");
pigServer.registerQuery("b = filter a by $0 == 'ä¸æ';");
Iterator<Tuple> iter = pigServer.openIterator("a");
-
+
assertEquals(DataType.toString(iter.next().get(0)), "ä¸æ");
f1.delete();
}
-
+
@Test
public void testParamSubstitution() throws Exception{
File queryFile = File.createTempFile("query", "");
PrintWriter ps = new PrintWriter(queryFile);
ps.println("b = filter a by $0 == '$querystring';");
ps.close();
-
+
String[] arg = {"querystring='ä¸æ'"};
-
+
ParameterSubstitutionPreprocessor psp = new ParameterSubstitutionPreprocessor(50);
BufferedReader pigIStream = new BufferedReader(new FileReader(queryFile.toString()));
StringWriter pigOStream = new StringWriter();
psp.genSubstitutedFile(pigIStream , pigOStream , arg, null);
-
+
assertTrue(pigOStream.toString().contains("ä¸æ"));
-
+
queryFile.delete();
}
}
Modified: pig/trunk/test/org/apache/pig/test/TestUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestUnion.java?rev=1222488&r1=1222487&r2=1222488&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestUnion.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestUnion.java Thu Dec 22 22:14:06 2011
@@ -17,8 +17,6 @@
*/
package org.apache.pig.test;
-import static org.junit.Assert.assertEquals;
-
import java.io.File;
import java.io.IOException;
import java.util.Iterator;
@@ -35,8 +33,6 @@ import org.apache.pig.data.DefaultTuple;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.io.FileSpec;
-import org.apache.pig.impl.io.FileLocalizer;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
@@ -67,7 +63,7 @@ import org.junit.runners.JUnit4;
* |---For Each - -7192652407897311774
* |
* |---Load - --3816247505117325386
- *
+ *
* Tests the Start Plan operator with the above plan.
* The verification is done as follows:
* Both loads load the same file(/etc/passwd).
@@ -86,8 +82,12 @@ public class TestUnion extends junit.fra
DataBag expBag;
static MiniCluster cluster = MiniCluster.buildCluster();
PigContext pc = new PigContext();
+ PigServer pigServer;
+
+ @Override
@Before
public void setUp() throws Exception {
+ pigServer = new PigServer(ExecType.LOCAL, cluster.getProperties());
pc.connect();
GenPhyOp.setPc(pc);
POLoad ld1 = GenPhyOp.topLoadOp();
@@ -95,14 +95,14 @@ public class TestUnion extends junit.fra
String inpDir = curDir + File.separatorChar + "test/org/apache/pig/test/data/InputFiles/";
FileSpec fSpec = new FileSpec("file:"+ inpDir +"passwd", new FuncSpec(PigStorage.class.getName() , new String[]{":"}));
ld1.setLFile(fSpec);
-
+
POLoad ld2 = GenPhyOp.topLoadOp();
ld2.setLFile(fSpec);
-
+
POFilter fl1 = GenPhyOp.topFilterOpWithProj(1, 50, GenPhyOp.LTE);
-
+
POFilter fl2 = GenPhyOp.topFilterOpWithProj(1, 50, GenPhyOp.GT);
-
+
int[] flds = {0,2};
Tuple sample = new DefaultTuple();
sample.append(new String("S"));
@@ -115,13 +115,13 @@ public class TestUnion extends junit.fra
sample.append(new String("x"));
POForEach fe1 = GenPhyOp.topForEachOPWithPlan(flds , sample);
-
+
POForEach fe2 = GenPhyOp.topForEachOPWithPlan(flds , sample);
-
+
sp = GenPhyOp.topUnionOp();
-
+
PhysicalPlan plan = new PhysicalPlan();
-
+
plan.add(ld1);
plan.add(ld2);
plan.add(fl1);
@@ -129,18 +129,18 @@ public class TestUnion extends junit.fra
plan.add(fe1);
plan.add(fe2);
plan.add(sp);
-
+
plan.connect(ld1, fe1);
plan.connect(fe1, fl1);
plan.connect(ld2, fe2);
plan.connect(fe2, fl2);
plan.connect(fl1, sp);
plan.connect(fl2, sp);
-
+
/*PlanPrinter ppp = new PlanPrinter(plan);
ppp.visit();*/
-
-
+
+
POLoad ld3 = GenPhyOp.topLoadOp();
ld3.setLFile(fSpec);
DataBag fullBag = DefaultBagFactory.getInstance().newDefaultBag();
@@ -153,15 +153,16 @@ public class TestUnion extends junit.fra
expBag = TestHelper.projectBag(fullBag, fields);
}
+ @Override
@After
public void tearDown() throws Exception {
}
-
+
@AfterClass
public static void oneTimeTearDown() throws Exception {
cluster.shutDown();
}
-
+
private Tuple castToDBA(Tuple in) throws ExecException{
Tuple res = new DefaultTuple();
for (int i=0;i<in.size();i++) {
@@ -192,7 +193,7 @@ public class TestUnion extends junit.fra
// | |-POUnion (root 2)--> This union's getNext() can lead the code here
// |
// |--POLocalRearrange (root 1)
-
+
// The inner POUnion above is a root in the plan which has 2 roots.
// So these 2 roots would have input coming from different input
// sources (dfs files). So certain maps would be working on input only
@@ -201,18 +202,17 @@ public class TestUnion extends junit.fra
// neither get input attached to it nor does it have predecessors
@Test
public void testGetNextNullInput() throws Exception {
- Util.createInputFile(cluster, "a.txt", new String[] {"1\t2\t3", "4\t5\t6"});
- Util.createInputFile(cluster, "b.txt", new String[] {"7\t8\t9", "1\t200\t300"});
- Util.createInputFile(cluster, "c.txt", new String[] {"1\t20\t30"});
- FileLocalizer.deleteTempFiles();
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- pig.registerQuery("a = load 'a.txt' ;");
- pig.registerQuery("b = load 'b.txt';");
- pig.registerQuery("c = union a, b;");
- pig.registerQuery("d = load 'c.txt' ;");
- pig.registerQuery("e = cogroup c by $0 inner, d by $0 inner;");
- pig.explain("e", System.err);
- // output should be
+ File f1 = Util.createInputFile("tmp", "a.txt", new String[] {"1\t2\t3", "4\t5\t6"});
+ File f2 = Util.createInputFile("tmp", "b.txt", new String[] {"7\t8\t9", "1\t200\t300"});
+ File f3 = Util.createInputFile("tmp", "c.txt", new String[] {"1\t20\t30"});
+ //FileLocalizer.deleteTempFiles();
+ pigServer.registerQuery("a = load '" + f1.getAbsolutePath() + "' ;");
+ pigServer.registerQuery("b = load '" + f2.getAbsolutePath() + "';");
+ pigServer.registerQuery("c = union a, b;");
+ pigServer.registerQuery("d = load '" + f3.getAbsolutePath() + "' ;");
+ pigServer.registerQuery("e = cogroup c by $0 inner, d by $0 inner;");
+ pigServer.explain("e", System.err);
+ // output should be
// (1,{(1,2,3),(1,200,300)},{(1,20,30)})
Tuple expectedResult = new DefaultTuple();
expectedResult.append(new DataByteArray("1"));
@@ -223,24 +223,23 @@ public class TestUnion extends junit.fra
expectedResult.append(secondField);
DataBag thirdField = Util.createBag(new Tuple[]{Util.createTuple(Util.toDataByteArrays(new String[]{"1", "20", "30"}))});
expectedResult.append(thirdField);
- Iterator<Tuple> it = pig.openIterator("e");
+ Iterator<Tuple> it = pigServer.openIterator("e");
assertEquals(expectedResult, it.next());
assertFalse(it.hasNext());
}
-
+
// Test schema merge in union when one of the fields is a bag
@Test
public void testSchemaMergeWithBag() throws Exception {
- Util.createInputFile(cluster, "input1.txt", new String[] {"dummy"});
- Util.createInputFile(cluster, "input2.txt", new String[] {"dummy"});
- PigServer pig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
- Util.registerMultiLineQuery(pig, "a = load 'input1.txt';" +
- "b = load 'input2.txt';" +
+ File f1 = Util.createInputFile("tmp", "input1.txt", new String[] {"dummy"});
+ File f2 = Util.createInputFile("tmp", "input2.txt", new String[] {"dummy"});
+ Util.registerMultiLineQuery(pigServer, "a = load '" + f1.getAbsolutePath() + "';" +
+ "b = load '" + f2.getAbsolutePath() + "';" +
"c = foreach a generate 1, {(1, 'str1')};" +
"d = foreach b generate 2, {(2, 'str2')};" +
"e = union c,d;" +
"");
- Iterator<Tuple> it = pig.openIterator("e");
+ Iterator<Tuple> it = pigServer.openIterator("e");
Object[] expected = new Object[] { Util.getPigConstant("(1, {(1, 'str1')})"),
Util.getPigConstant("(2, {(2, 'str2')})")};
Object[] results = new Object[2];
@@ -262,5 +261,5 @@ public class TestUnion extends junit.fra
assertTrue(expected[j].equals(results[j]));
}
}
-
+
}