You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by rd...@apache.org on 2011/05/04 02:11:51 UTC
svn commit: r1099299 [1/2] - in /pig/trunk: CHANGES.txt
test/org/apache/pig/test/TestMultiQuery.java
test/org/apache/pig/test/TestMultiQueryBasic.java
test/org/apache/pig/test/Util.java
Author: rding
Date: Wed May 4 00:11:51 2011
New Revision: 1099299
URL: http://svn.apache.org/viewvc?rev=1099299&view=rev
Log:
PIG-2028: Speed up multiquery unit tests
Modified:
pig/trunk/CHANGES.txt
pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
pig/trunk/test/org/apache/pig/test/TestMultiQueryBasic.java
pig/trunk/test/org/apache/pig/test/Util.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1099299&r1=1099298&r2=1099299&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Wed May 4 00:11:51 2011
@@ -196,6 +196,8 @@ PIG-1696: Performance: Use System.arrayc
BUG FIXES
+PIG-2028: Speed up multiquery unit tests (rding)
+
PIG-1990: support casting of complex types with empty inner schema
to complex type with non-empty inner schema (thejas)
Modified: pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=1099299&r1=1099298&r2=1099299&view=diff
==============================================================================
--- pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Wed May 4 00:11:51 2011
@@ -21,19 +21,15 @@ import static org.junit.Assert.assertEqu
import static org.junit.Assert.assertTrue;
import java.io.File;
-import java.io.FileWriter;
-import java.io.IOException;
-import java.io.PrintWriter;
import java.util.Iterator;
import java.util.List;
-
-import junit.framework.Assert;
+import java.util.Properties;
import org.apache.pig.ExecType;
import org.apache.pig.PigServer;
import org.apache.pig.backend.executionengine.ExecJob;
import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.impl.PigContext;
import org.junit.After;
import org.junit.AfterClass;
import org.junit.Before;
@@ -41,972 +37,752 @@ import org.junit.BeforeClass;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
+
@RunWith(JUnit4.class)
public class TestMultiQuery {
- private static final MiniCluster cluster = MiniCluster.buildCluster();
-
- private PigServer myPig;
+ private static PigServer myPig;
@BeforeClass
- public static void setUpBeforeClass() throws IOException {
- Util.copyFromLocalToCluster(cluster,
+ public static void setUpBeforeClass() throws Exception {
+ Util.copyFromLocalToLocal(
"test/org/apache/pig/test/data/passwd", "passwd");
- Util.copyFromLocalToCluster(cluster,
+ Util.copyFromLocalToLocal(
"test/org/apache/pig/test/data/passwd2", "passwd2");
+ Properties props = new Properties();
+ props.setProperty("opt.multiquery", ""+true);
+ myPig = new PigServer(ExecType.LOCAL, props);
}
@AfterClass
- public static void tearDownAfterClass() throws IOException {
- Util.deleteFile(cluster, "passwd");
- Util.deleteFile(cluster, "passwd2");
- cluster.shutDown();
+ public static void tearDownAfterClass() throws Exception {
+ Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd");
+ Util.deleteFile(new PigContext(ExecType.LOCAL, new Properties()), "passwd2");
+ deleteOutputFiles();
}
@Before
public void setUp() throws Exception {
- cluster.setProperty("opt.multiquery", ""+true);
- myPig = new PigServer(ExecType.MAPREDUCE, cluster.getProperties());
deleteOutputFiles();
}
@After
public void tearDown() throws Exception {
- myPig = null;
+
}
@Test
- public void testMultiQueryJiraPig1438() {
+ public void testMultiQueryJiraPig1438() throws Exception {
// test case: merge multiple distinct jobs
String INPUT_FILE = "abc";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("1\t2\t3");
- w.println("2\t3\t4");
- w.println("1\t2\t3");
- w.println("2\t3\t4");
- w.println("1\t2\t3");
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
- myPig.registerQuery("B1 = foreach A generate col1, col2;");
- myPig.registerQuery("B2 = foreach A generate col2, col3;");
- myPig.registerQuery("C1 = distinct B1;");
- myPig.registerQuery("C2 = distinct B2;");
- myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
- myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
- myPig.registerQuery("store D1 into '/tmp/output1';");
- myPig.registerQuery("store D2 into '/tmp/output2';");
-
- myPig.executeBatch();
-
- myPig.registerQuery("E = load '/tmp/output1' as (a:int, b:int);");
- Iterator<Tuple> iter = myPig.openIterator("E");
+ String[] inputData = {
+ "1\t2\t3",
+ "2\t3\t4",
+ "1\t2\t3",
+ "2\t3\t4",
+ "1\t2\t3"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(1,2)",
- "(2,3)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
- assertEquals(expectedResults.size(), counter);
-
- myPig.registerQuery("E = load '/tmp/output2' as (a:int, b:int);");
- iter = myPig.openIterator("E");
+ myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1:int, col2:int, col3:int);");
+ myPig.registerQuery("B1 = foreach A generate col1, col2;");
+ myPig.registerQuery("B2 = foreach A generate col2, col3;");
+ myPig.registerQuery("C1 = distinct B1;");
+ myPig.registerQuery("C2 = distinct B2;");
+ myPig.registerQuery("D1 = foreach C1 generate col1, col2;");
+ myPig.registerQuery("D2 = foreach C2 generate col2, col3;");
+ myPig.registerQuery("store D1 into 'output1';");
+ myPig.registerQuery("store D2 into 'output2';");
+
+ myPig.executeBatch();
+
+ myPig.registerQuery("E = load 'output1' as (a:int, b:int);");
+ Iterator<Tuple> iter = myPig.openIterator("E");
- expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(2,3)",
- "(3,4)"
- });
-
- counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1,2)",
+ "(2,3)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+ assertEquals(expectedResults.size(), counter);
+
+ myPig.registerQuery("E = load 'output2' as (a:int, b:int);");
+ iter = myPig.openIterator("E");
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(2,3)",
+ "(3,4)"
+ });
+
+ counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1252() {
+ public void testMultiQueryJiraPig1252() throws Exception {
// test case: Problems with secondary key optimization and multiquery
// diamond optimization
String INPUT_FILE = "abc";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("1\t2\t3");
- w.println("2\t3\t4");
- w.println("3\t\t5");
- w.println("5\t6\t6");
- w.println("6\t\t7");
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
- myPig.registerQuery("B = foreach A generate (chararray) col1, " +
- "(chararray) ((col2 is not null) ? " +
- "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
- myPig.registerQuery("split B into C if splitcond != '', D if splitcond == '';");
- myPig.registerQuery("E = group C by splitcond;");
- myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
-
- Iterator<Tuple> iter = myPig.openIterator("F");
-
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(1,2)",
- "(2,3)",
- "(3,5)",
- "(5,6)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ String[] inputData = {
+ "1\t2\t3",
+ "2\t3\t4",
+ "3\t\t5",
+ "5\t6\t6",
+ "6\t\t7"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load '" + INPUT_FILE + "' as (col1, col2, col3);");
+ myPig.registerQuery("B = foreach A generate (chararray) col1, " +
+ "(chararray) ((col2 is not null) ? " +
+ "col2 : (col3 < 6 ? col3 : '')) as splitcond;");
+ myPig.registerQuery("split B into C if splitcond != '', D if splitcond == '';");
+ myPig.registerQuery("E = group C by splitcond;");
+ myPig.registerQuery("F = foreach E { orderedData = order C by $1, $0; generate flatten(orderedData); };");
+
+ Iterator<Tuple> iter = myPig.openIterator("F");
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1,2)",
+ "(2,3)",
+ "(3,5)",
+ "(5,6)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1169() {
+ public void testMultiQueryJiraPig1169() throws Exception {
// test case: Problems with some top N queries
String INPUT_FILE = "abc";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("1\t2\t3");
- w.println("2\t3\t4");
- w.println("3\t4\t5");
- w.println("5\t6\t7");
- w.println("6\t7\t8");
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("A = load '" + INPUT_FILE
- + "' as (a:int, b, c);");
- myPig.registerQuery("A1 = Order A by a desc parallel 3;");
- myPig.registerQuery("A2 = limit A1 2;");
- myPig.registerQuery("store A1 into '/tmp/input1';");
- myPig.registerQuery("store A2 into '/tmp/input2';");
+ String[] inputData = {
+ "1\t2\t3",
+ "2\t3\t4",
+ "3\t4\t5",
+ "5\t6\t7",
+ "6\t7\t8"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
- myPig.executeBatch();
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:int, b, c);");
+ myPig.registerQuery("A1 = Order A by a desc parallel 3;");
+ myPig.registerQuery("A2 = limit A1 2;");
+ myPig.registerQuery("store A1 into 'output1';");
+ myPig.registerQuery("store A2 into 'output2';");
- myPig.registerQuery("B = load '/tmp/input2' as (a:int, b, c);");
-
- Iterator<Tuple> iter = myPig.openIterator("B");
+ myPig.executeBatch();
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(6,7,8)",
- "(5,6,7)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ myPig.registerQuery("B = load 'output2' as (a:int, b, c);");
+
+ Iterator<Tuple> iter = myPig.openIterator("B");
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(6,7,8)",
+ "(5,6,7)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1171() {
+ public void testMultiQueryJiraPig1171() throws Exception {
// test case: Problems with some top N queries
String INPUT_FILE = "abc";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("1\tapple\t3");
- w.println("2\torange\t4");
- w.println("3\tpersimmon\t5");
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("A = load '" + INPUT_FILE
- + "' as (a:long, b, c);");
- myPig.registerQuery("A1 = Order A by a desc;");
- myPig.registerQuery("A2 = limit A1 1;");
- myPig.registerQuery("B = load '" + INPUT_FILE
- + "' as (a:long, b, c);");
- myPig.registerQuery("B1 = Order B by a desc;");
- myPig.registerQuery("B2 = limit B1 1;");
-
- myPig.registerQuery("C = cross A2, B2;");
-
- Iterator<Tuple> iter = myPig.openIterator("C");
+ String[] inputData = {
+ "1\tapple\t3",
+ "2\torange\t4",
+ "3\tpersimmon\t5"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(3L,'persimmon',5,3L,'persimmon',5)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ myPig.setBatchOn();
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("A1 = Order A by a desc;");
+ myPig.registerQuery("A2 = limit A1 1;");
+ myPig.registerQuery("B = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("B1 = Order B by a desc;");
+ myPig.registerQuery("B2 = limit B1 1;");
+
+ myPig.registerQuery("C = cross A2, B2;");
+
+ Iterator<Tuple> iter = myPig.openIterator("C");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(3L,'persimmon',5,3L,'persimmon',5)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1157() {
+ public void testMultiQueryJiraPig1157() throws Exception {
// test case: Sucessive replicated joins do not generate Map Reduce plan and fails due to OOM
String INPUT_FILE = "abc";
- String INPUT_FILE_1 = "xyz";
+ String INPUT_FILE_1 = "abc";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("1\tapple\t3");
- w.println("2\torange\t4");
- w.println("3\tpersimmon\t5");
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE_1);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("A = load '" + INPUT_FILE
- + "' as (a:long, b, c);");
- myPig.registerQuery("A1 = FOREACH A GENERATE a;");
- myPig.registerQuery("B = GROUP A1 BY a;");
- myPig.registerQuery("C = load '" + INPUT_FILE_1
- + "' as (x:long, y);");
- myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
- myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+ String[] inputData = {
+ "1\tapple\t3",
+ "2\torange\t4",
+ "3\tpersimmon\t5"
+ };
- Iterator<Tuple> iter = myPig.openIterator("E");
+ Util.createLocalInputFile(INPUT_FILE, inputData);
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "(1L,'apple',3,1L,'apple',1L,{(1L)})",
- "(2L,'orange',4,2L,'orange',2L,{(2L)})",
- "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ myPig.setBatchOn();
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- Util.deleteFile(cluster, INPUT_FILE_1);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("A = load '" + INPUT_FILE
+ + "' as (a:long, b, c);");
+ myPig.registerQuery("A1 = FOREACH A GENERATE a;");
+ myPig.registerQuery("B = GROUP A1 BY a;");
+ myPig.registerQuery("C = load '" + INPUT_FILE_1
+ + "' as (x:long, y);");
+ myPig.registerQuery("D = JOIN C BY x, B BY group USING 'replicated';");
+ myPig.registerQuery("E = JOIN A BY a, D by x USING 'replicated';");
+
+ Iterator<Tuple> iter = myPig.openIterator("E");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "(1L,'apple',3,1L,'apple',1L,{(1L)})",
+ "(2L,'orange',4,2L,'orange',2L,{(2L)})",
+ "(3L,'persimmon',5,3L,'persimmon',3L,{(3L)})"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1068() {
+ public void testMultiQueryJiraPig1068() throws Exception {
// test case: COGROUP fails with 'Type mismatch in key from map:
// expected org.apache.pig.impl.io.NullableText, recieved org.apache.pig.impl.io.NullableTuple'
String INPUT_FILE = "pig-1068.txt";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("10\tapple\tlogin\tjar");
- w.println("20\torange\tlogin\tbox");
- w.println("30\tstrawberry\tquit\tbot");
-
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("logs = load '" + INPUT_FILE
- + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
- myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
- myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
- myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
- myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
- + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
- myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
- myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
- myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
-
- Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
-
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('apple',{},{('apple','jar',1L)})",
- "('orange',{},{('orange','box',1L)})",
- "('strawberry',{(30,'strawberry','quit','bot')},{})"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
-
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
- }
+ String[] inputData = {
+ "10\tapple\tlogin\tjar",
+ "20\torange\tlogin\tbox",
+ "30\tstrawberry\tquit\tbot"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("logs = load '" + INPUT_FILE
+ + "' as (ts:int, id:chararray, command:chararray, comments:chararray);");
+ myPig.registerQuery("SPLIT logs INTO logins IF command == 'login', all_quits IF command == 'quit';");
+ myPig.registerQuery("login_info = FOREACH logins { GENERATE id as id, comments AS client; };");
+ myPig.registerQuery("logins_grouped = GROUP login_info BY (id, client);");
+ myPig.registerQuery("count_logins_by_client = FOREACH logins_grouped "
+ + "{ generate group.id AS id, group.client AS client, COUNT($1) AS count; };");
+ myPig.registerQuery("all_quits_grouped = GROUP all_quits BY id; ");
+ myPig.registerQuery("quits = FOREACH all_quits_grouped { GENERATE FLATTEN(all_quits); };");
+ myPig.registerQuery("joined_session_info = COGROUP quits BY id, count_logins_by_client BY id;");
+
+ Iterator<Tuple> iter = myPig.openIterator("joined_session_info");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('apple',{},{('apple','jar',1L)})",
+ "('orange',{},{('orange','box',1L)})",
+ "('strawberry',{(30,'strawberry','quit','bot')},{})"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
+ }
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig1108() {
- try {
- myPig.setBatchOn();
-
- myPig.registerQuery("a = load 'passwd' "
- + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
- myPig.registerQuery("b = group plan1 by uname;");
- myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
- + "generate flatten(group) as foo, tmp; };");
- myPig.registerQuery("d = filter c BY foo is not null;");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store plan2 into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ public void testMultiQueryJiraPig1108() throws Exception {
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'passwd' "
+ + "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("split a into plan1 if (uid > 5), plan2 if ( uid < 5);");
+ myPig.registerQuery("b = group plan1 by uname;");
+ myPig.registerQuery("c = foreach b { tmp = order plan1 by uid desc; "
+ + "generate flatten(group) as foo, tmp; };");
+ myPig.registerQuery("d = filter c BY foo is not null;");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store plan2 into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig1114() {
+ public void testMultiQueryJiraPig1114() throws Exception {
// test case: MultiQuery optimization throws error when merging 2 level splits
String INPUT_FILE = "data.txt";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("10\tjar");
- w.println("20\tbox");
- w.println("30\tbot");
- w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("data = load '" + INPUT_FILE
- + "' USING PigStorage as (id:int, name:chararray);");
- myPig.registerQuery("ids = FOREACH data GENERATE id;");
- myPig.registerQuery("allId = GROUP ids all;");
- myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;");
- myPig.registerQuery("idGroup = GROUP ids by id;");
- myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;");
- myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;");
- myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;");
- myPig.registerQuery("orderedCounts = order idCountTotal by count desc;");
- myPig.registerQuery("STORE orderedCounts INTO '/tmp/output1';");
-
- myPig.registerQuery("names = FOREACH data GENERATE name;");
- myPig.registerQuery("allNames = GROUP names all;");
- myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;");
- myPig.registerQuery("nameGroup = GROUP names by name;");
- myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;");
- myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;");
- myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;");
- myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;");
- myPig.registerQuery("STORE nameCountsOrdered INTO '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ String[] inputData = {
+ "10\tjar",
+ "20\tbox",
+ "30\tbot"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE
+ + "' USING PigStorage as (id:int, name:chararray);");
+ myPig.registerQuery("ids = FOREACH data GENERATE id;");
+ myPig.registerQuery("allId = GROUP ids all;");
+ myPig.registerQuery("allIdCount = FOREACH allId GENERATE group as allId, COUNT(ids) as total;");
+ myPig.registerQuery("idGroup = GROUP ids by id;");
+ myPig.registerQuery("idGroupCount = FOREACH idGroup GENERATE group as id, COUNT(ids) as count;");
+ myPig.registerQuery("countTotal = cross idGroupCount, allIdCount;");
+ myPig.registerQuery("idCountTotal = foreach countTotal generate id, count, total, (double)count / (double)total as proportion;");
+ myPig.registerQuery("orderedCounts = order idCountTotal by count desc;");
+ myPig.registerQuery("STORE orderedCounts INTO 'output1';");
+
+ myPig.registerQuery("names = FOREACH data GENERATE name;");
+ myPig.registerQuery("allNames = GROUP names all;");
+ myPig.registerQuery("allNamesCount = FOREACH allNames GENERATE group as namesAll, COUNT(names) as total;");
+ myPig.registerQuery("nameGroup = GROUP names by name;");
+ myPig.registerQuery("nameGroupCount = FOREACH nameGroup GENERATE group as name, COUNT(names) as count;");
+ myPig.registerQuery("namesCrossed = cross nameGroupCount, allNamesCount;");
+ myPig.registerQuery("nameCountTotal = foreach namesCrossed generate name, count, total, (double)count / (double)total as proportion;");
+ myPig.registerQuery("nameCountsOrdered = order nameCountTotal by count desc;");
+ myPig.registerQuery("STORE nameCountsOrdered INTO 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
}
@Test
- public void testMultiQueryJiraPig1113() {
+ public void testMultiQueryJiraPig1113() throws Exception {
// test case: Diamond query optimization throws error in JOIN
String INPUT_FILE_1 = "set1.txt";
String INPUT_FILE_2 = "set2.txt";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE_1));
- w.println("login\t0\tjar");
- w.println("login\t1\tbox");
- w.println("quit\t0\tmany");
- w.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE_1, INPUT_FILE_1);
-
- PrintWriter w2 = new PrintWriter(new FileWriter(INPUT_FILE_2));
- w2.println("apple\tlogin\t{(login)}");
- w2.println("orange\tlogin\t{(login)}");
- w2.println("strawberry\tquit\t{(login)}");
- w2.close();
- Util.copyFromLocalToCluster(cluster, INPUT_FILE_2, INPUT_FILE_2);
-
- myPig.setBatchOn();
+
- myPig.registerQuery("set1 = load '" + INPUT_FILE_1
- + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
- myPig.registerQuery("set2 = load '" + INPUT_FILE_2
- + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
- myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
- + "(chararray) 0 as f3;");
- myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
- + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
- myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
- myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
-
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('quit','0','many','strawberry','quit','0')",
- "('login','0','jar','apple','login','0')",
- "('login','0','jar','orange','login','0')",
- "('login','1','box','apple','login','1')",
- "('login','1','box','orange','login','1')",
- "('login','1','box','strawberry','login','1')"
- });
-
- Iterator<Tuple> iter = myPig.openIterator("joined_sets");
- int count = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(count++).toString(), iter.next().toString());
- }
- assertEquals(expectedResults.size(), count);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE_1).delete();
- new File(INPUT_FILE_2).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE_1);
- Util.deleteFile(cluster, INPUT_FILE_2);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ String[] inputData_1 = {
+ "login\t0\tjar",
+ "login\t1\tbox",
+ "quit\t0\tmany"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE_1, inputData_1);
+
+ String[] inputData_2 = {
+ "apple\tlogin\t{(login)}",
+ "orange\tlogin\t{(login)}",
+ "strawberry\tquit\t{(login)}"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE_2, inputData_2);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("set1 = load '" + INPUT_FILE_1
+ + "' USING PigStorage as (a:chararray, b:chararray, c:chararray);");
+ myPig.registerQuery("set2 = load '" + INPUT_FILE_2
+ + "' USING PigStorage as (a: chararray, b:chararray, c:bag{});");
+ myPig.registerQuery("set2_1 = FOREACH set2 GENERATE a as f1, b as f2, "
+ + "(chararray) 0 as f3;");
+ myPig.registerQuery("set2_2 = FOREACH set2 GENERATE a as f1, "
+ + "FLATTEN((IsEmpty(c) ? null : c)) as f2, (chararray) 1 as f3;");
+ myPig.registerQuery("all_set2 = UNION set2_1, set2_2;");
+ myPig.registerQuery("joined_sets = JOIN set1 BY (a,b), all_set2 BY (f2,f3);");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('quit','0','many','strawberry','quit','0')",
+ "('login','0','jar','apple','login','0')",
+ "('login','0','jar','orange','login','0')",
+ "('login','1','box','apple','login','1')",
+ "('login','1','box','orange','login','1')",
+ "('login','1','box','strawberry','login','1')"
+ });
+
+ Iterator<Tuple> iter = myPig.openIterator("joined_sets");
+ int count = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(count++).toString(), iter.next().toString());
}
+ assertEquals(expectedResults.size(), count);
}
@Test
- public void testMultiQueryJiraPig1060_2() {
+ public void testMultiQueryJiraPig1060_2() throws Exception {
// test case:
String INPUT_FILE = "pig-1060.txt";
- try {
+ String[] inputData = {
+ "apple\t2",
+ "apple\t12",
+ "orange\t3",
+ "orange\t23",
+ "strawberry\t10",
+ "strawberry\t34"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("data = load '" + INPUT_FILE +
+ "' as (name:chararray, gid:int);");
+ myPig.registerQuery("f1 = filter data by gid < 5;");
+ myPig.registerQuery("g1 = group f1 by name;");
+ myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
+ myPig.registerQuery("store p1 into 'output1';");
+
+ myPig.registerQuery("f2 = filter data by gid > 5;");
+ myPig.registerQuery("g2 = group f2 by name;");
+ myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
+ myPig.registerQuery("store p2 into 'output2';");
+
+ myPig.registerQuery("f3 = filter f2 by gid > 10;");
+ myPig.registerQuery("g3 = group f3 by name;");
+ myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
+ myPig.registerQuery("store p3 into 'output3';");
+
+ myPig.registerQuery("f4 = filter f3 by gid < 20;");
+ myPig.registerQuery("g4 = group f4 by name;");
+ myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
+ myPig.registerQuery("store p4 into 'output4';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertEquals(4, jobs.size());
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("apple\t2");
- w.println("apple\t12");
- w.println("orange\t3");
- w.println("orange\t23");
- w.println("strawberry\t10");
- w.println("strawberry\t34");
-
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("data = load '" + INPUT_FILE +
- "' as (name:chararray, gid:int);");
- myPig.registerQuery("f1 = filter data by gid < 5;");
- myPig.registerQuery("g1 = group f1 by name;");
- myPig.registerQuery("p1 = foreach g1 generate group, COUNT(f1.gid);");
- myPig.registerQuery("store p1 into '/tmp/output1';");
-
- myPig.registerQuery("f2 = filter data by gid > 5;");
- myPig.registerQuery("g2 = group f2 by name;");
- myPig.registerQuery("p2 = foreach g2 generate group, COUNT(f2.gid);");
- myPig.registerQuery("store p2 into '/tmp/output2';");
-
- myPig.registerQuery("f3 = filter f2 by gid > 10;");
- myPig.registerQuery("g3 = group f3 by name;");
- myPig.registerQuery("p3 = foreach g3 generate group, COUNT(f3.gid);");
- myPig.registerQuery("store p3 into '/tmp/output3';");
-
- myPig.registerQuery("f4 = filter f3 by gid < 20;");
- myPig.registerQuery("g4 = group f4 by name;");
- myPig.registerQuery("p4 = foreach g4 generate group, COUNT(f4.gid);");
- myPig.registerQuery("store p4 into '/tmp/output4';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- assertEquals(4, jobs.size());
-
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
}
@Test
- public void testMultiQueryJiraPig920_2() {
+ public void testMultiQueryJiraPig920_2() throws Exception {
// test case: execution of a query with two diamonds
- try {
- myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = filter a by uid < 5;");
- myPig.registerQuery("c = filter a by gid >= 5;");
- myPig.registerQuery("d = filter a by uid >= 5;");
- myPig.registerQuery("e = filter a by gid < 5;");
- myPig.registerQuery("f = cogroup c by $0, b by $0;");
- myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
- myPig.registerQuery("store f1 into '/tmp/output1';");
- myPig.registerQuery("g = cogroup d by $0, e by $0;");
- myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
- myPig.registerQuery("store g1 into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by gid >= 5;");
+ myPig.registerQuery("d = filter a by uid >= 5;");
+ myPig.registerQuery("e = filter a by gid < 5;");
+ myPig.registerQuery("f = cogroup c by $0, b by $0;");
+ myPig.registerQuery("f1 = foreach f generate group, COUNT(c), COUNT(b);");
+ myPig.registerQuery("store f1 into 'output1';");
+ myPig.registerQuery("g = cogroup d by $0, e by $0;");
+ myPig.registerQuery("g1 = foreach g generate group, COUNT(d), COUNT(e);");
+ myPig.registerQuery("store g1 into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig920_3() {
+ public void testMultiQueryJiraPig920_3() throws Exception {
// test case: execution of a simple diamond query
String INPUT_FILE = "pig-920.txt";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("apple\tapple\t100\t10");
- w.println("apple\tapple\t200\t20");
- w.println("orange\torange\t100\t10");
- w.println("orange\torange\t300\t20");
-
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
+ String[] inputData = {
+ "apple\tapple\t100\t10",
+ "apple\tapple\t200\t20",
+ "orange\torange\t100\t10",
+ "orange\torange\t300\t20"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
- myPig.registerQuery("a = load '" + INPUT_FILE +
- "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = filter a by uid < 300;");
- myPig.registerQuery("c = filter a by gid > 10;");
- myPig.registerQuery("d = cogroup c by $0, b by $0;");
- myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
-
- Iterator<Tuple> iter = myPig.openIterator("e");
-
- List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
- new String[] {
- "('apple',1L,2L)",
- "('orange',1L,1L)"
- });
-
- int counter = 0;
- while (iter.hasNext()) {
- assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
- }
+ myPig.setBatchOn();
- assertEquals(expectedResults.size(), counter);
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
-
+ myPig.registerQuery("a = load '" + INPUT_FILE +
+ "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 300;");
+ myPig.registerQuery("c = filter a by gid > 10;");
+ myPig.registerQuery("d = cogroup c by $0, b by $0;");
+ myPig.registerQuery("e = foreach d generate group, COUNT(c), COUNT(b);");
+
+ Iterator<Tuple> iter = myPig.openIterator("e");
+
+ List<Tuple> expectedResults = Util.getTuplesFromConstantTupleStrings(
+ new String[] {
+ "('apple',1L,2L)",
+ "('orange',1L,1L)"
+ });
+
+ int counter = 0;
+ while (iter.hasNext()) {
+ assertEquals(expectedResults.get(counter++).toString(), iter.next().toString());
}
+
+ assertEquals(expectedResults.size(), counter);
}
@Test
- public void testMultiQueryJiraPig976() {
+ public void testMultiQueryJiraPig976() throws Exception {
// test case: key ('group') isn't part of foreach output
// and keys have the same type.
- try {
- myPig.setBatchOn();
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a by uid;");
- myPig.registerQuery("c = group a by gid;");
- myPig.registerQuery("d = foreach b generate SUM(a.gid);");
- myPig.registerQuery("e = foreach c generate group, COUNT(a);");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig976_2() {
+ public void testMultiQueryJiraPig976_2() throws Exception {
// test case: key ('group') isn't part of foreach output
// and keys have different types
- try {
- myPig.setBatchOn();
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a by uname;");
- myPig.registerQuery("c = group a by gid;");
- myPig.registerQuery("d = foreach b generate SUM(a.gid);");
- myPig.registerQuery("e = foreach c generate group, COUNT(a);");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uname;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig976_3() {
+ public void testMultiQueryJiraPig976_3() throws Exception {
// test case: group all and key ('group') isn't part of output
- try {
- myPig.setBatchOn();
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a all;");
- myPig.registerQuery("c = group a by gid;");
- myPig.registerQuery("d = foreach b generate SUM(a.gid);");
- myPig.registerQuery("e = foreach c generate group, COUNT(a);");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a all;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig976_4() {
+ public void testMultiQueryJiraPig976_4() throws Exception {
// test case: group by multi-cols and key ('group') isn't part of output
-
- try {
- myPig.setBatchOn();
+
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a by uid;");
- myPig.registerQuery("c = group a by (uname, gid);");
- myPig.registerQuery("d = foreach b generate SUM(a.gid);");
- myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by (uname, gid);");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate group.uname, group.gid, COUNT(a);");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig976_5() {
+ public void testMultiQueryJiraPig976_5() throws Exception {
// test case: key ('group') in multiple positions.
- try {
- myPig.setBatchOn();
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a by uid;");
- myPig.registerQuery("c = group a by (uname, gid);");
- myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group as foo;");
- myPig.registerQuery("d1 = foreach d generate $1 + $2;");
- myPig.registerQuery("e = foreach c generate group, COUNT(a);");
- myPig.registerQuery("store d1 into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by (uname, gid);");
+ myPig.registerQuery("d = foreach b generate SUM(a.gid), group, group as foo;");
+ myPig.registerQuery("d1 = foreach d generate $1 + $2;");
+ myPig.registerQuery("e = foreach c generate group, COUNT(a);");
+ myPig.registerQuery("store d1 into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
@Test
- public void testMultiQueryJiraPig976_6() {
+ public void testMultiQueryJiraPig976_6() throws Exception {
// test case: key ('group') has null values.
String INPUT_FILE = "pig-976.txt";
- try {
-
- PrintWriter w = new PrintWriter(new FileWriter(INPUT_FILE));
- w.println("apple\tapple\t100\t10");
- w.println("apple\tapple\t\t20");
- w.println("orange\torange\t100\t10");
- w.println("orange\torange\t\t20");
- w.println("strawberry\tstrawberry\t300\t10");
-
- w.close();
-
- Util.copyFromLocalToCluster(cluster, INPUT_FILE, INPUT_FILE);
-
- myPig.setBatchOn();
-
- myPig.registerQuery("a = load '" + INPUT_FILE +
- "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = group a by uid;");
- myPig.registerQuery("c = group a by gid;");
- myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
- myPig.registerQuery("e = foreach c generate COUNT(a), group;");
- myPig.registerQuery("store d into '/tmp/output1';");
- myPig.registerQuery("store e into '/tmp/output2';");
+ String[] inputData = {
+ "apple\tapple\t100\t10",
+ "apple\tapple\t\t20",
+ "orange\torange\t100\t10",
+ "orange\torange\t\t20",
+ "strawberry\tstrawberry\t300\t10"
+ };
+
+ Util.createLocalInputFile(INPUT_FILE, inputData);
+
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load '" + INPUT_FILE +
+ "' as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = group a by uid;");
+ myPig.registerQuery("c = group a by gid;");
+ myPig.registerQuery("d = foreach b generate group, SUM(a.gid);");
+ myPig.registerQuery("e = foreach c generate COUNT(a), group;");
+ myPig.registerQuery("store d into 'output1';");
+ myPig.registerQuery("store e into 'output2';");
- List<ExecJob> jobs = myPig.executeBatch();
- assertTrue(jobs.size() == 2);
-
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
-
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- } finally {
- new File(INPUT_FILE).delete();
- try {
- Util.deleteFile(cluster, INPUT_FILE);
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
-
+ List<ExecJob> jobs = myPig.executeBatch();
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
}
}
@Test
- public void testMultiQueryJiraPig983_2() {
+ public void testMultiQueryJiraPig983_2() throws Exception {
System.out.println("===== multi-query Jira Pig-983_2 =====");
- try {
- myPig.setBatchOn();
+ myPig.setBatchOn();
- myPig.registerQuery("a = load 'passwd' " +
- "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
- myPig.registerQuery("b = filter a by uid < 5;");
- myPig.registerQuery("c = filter a by uid >= 5;");
- myPig.registerQuery("d = join b by uname, c by uname;");
- myPig.registerQuery("e = group d by b::gid;");
- myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);");
- myPig.registerQuery("store e1 into '/tmp/output1';");
- myPig.registerQuery("f = group d by c::gid;");
- myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
- myPig.registerQuery("store f1 into '/tmp/output2';");
-
- List<ExecJob> jobs = myPig.executeBatch();
+ myPig.registerQuery("a = load 'passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5;");
+ myPig.registerQuery("d = join b by uname, c by uname;");
+ myPig.registerQuery("e = group d by b::gid;");
+ myPig.registerQuery("e1 = foreach e generate group, COUNT(d.b::uid);");
+ myPig.registerQuery("store e1 into 'output1';");
+ myPig.registerQuery("f = group d by c::gid;");
+ myPig.registerQuery("f1 = foreach f generate group, SUM(d.c::uid);");
+ myPig.registerQuery("store f1 into 'output2';");
+
+ List<ExecJob> jobs = myPig.executeBatch();
- assertTrue(jobs.size() == 2);
-
- for (ExecJob job : jobs) {
- assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
- }
- } catch (Exception e) {
- e.printStackTrace();
- Assert.fail();
- }
+ assertTrue(jobs.size() == 2);
+
+ for (ExecJob job : jobs) {
+ assertTrue(job.getStatus() == ExecJob.JOB_STATUS.COMPLETED);
+ }
}
// --------------------------------------------------------------------------
// Helper methods
- private void deleteOutputFiles() {
- try {
- FileLocalizer.delete("/tmp/output1", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output2", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output3", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output4", myPig.getPigContext());
- FileLocalizer.delete("/tmp/output5", myPig.getPigContext());
- } catch (IOException e) {
- e.printStackTrace();
- Assert.fail();
- }
+ private static void deleteOutputFiles() {
+ Util.deleteDirectory(new File("output1"));
+ Util.deleteDirectory(new File("output2"));
+ Util.deleteDirectory(new File("output3"));
+ Util.deleteDirectory(new File("output4"));
+ Util.deleteDirectory(new File("output5"));
}
}