You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/04 22:50:32 UTC
svn commit: r771437 [2/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/
src/org/apache/pig/backend/hadoop/executionengin...
Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon May 4 20:50:31 2009
@@ -181,6 +181,342 @@
}
@Test
+ public void testMultiQueryPhase3BaseCase() {
+
+ System.out.println("===== multi-query phase 3 base case =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3BaseCase2() {
+
+ System.out.println("===== multi-query phase 3 base case (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithoutCombiner() {
+
+ System.out.println("===== multi-query phase 3 without combiner =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithoutCombiner2() {
+
+ System.out.println("===== multi-query phase 3 without combiner (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithMixedCombiner() {
+
+ System.out.println("===== multi-query phase 3 with mixed combiner =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithMixedCombiner2() {
+
+ System.out.println("===== multi-query phase 3 with mixed combiner (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by gid;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by gid;");
+ myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithDifferentMapDataTypes() {
+
+ System.out.println("===== multi-query phase 3 with different map datatypes =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid parallel 2;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by $1 parallel 3;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by $1 parallel 4;");
+ myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+ checkMRPlan(pp, 1, 1, 1);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3WithDifferentMapDataTypes2() {
+
+ System.out.println("===== multi-query phase 3 with different map datatypes (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+ "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+ myPig.registerQuery("b = filter a by uid < 5;");
+ myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+ myPig.registerQuery("d = filter a by uid >= 10;");
+ myPig.registerQuery("b1 = group b by gid;");
+ myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+ myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+ myPig.registerQuery("store b3 into '/tmp/output1';");
+ myPig.registerQuery("c1 = group c by $1;");
+ myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+ myPig.registerQuery("store c2 into '/tmp/output2';");
+ myPig.registerQuery("d1 = group d by $1;");
+ myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+ myPig.registerQuery("store d2 into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3StreamingInReducer() {
+
+ System.out.println("===== multi-query phase 3 with streaming in reducer =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+ myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+ myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+ myPig.registerQuery("B = group A3 by $2;");
+ myPig.registerQuery("C = foreach B generate flatten(A3);");
+ myPig.registerQuery("D = stream B through `cat`;");
+ myPig.registerQuery("store D into '/tmp/output1';");
+ myPig.registerQuery("E = group A4 by $2;");
+ myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+ myPig.registerQuery("store F into '/tmp/output2';");
+ myPig.registerQuery("G = group A1 by $2;");
+ myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
+ myPig.registerQuery("store H into '/tmp/output3';");
+
+ LogicalPlan lp = checkLogicalPlan(1, 3, 16);
+
+ PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 24);
+
+ checkMRPlan(pp, 1, 1, 2);
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
+ public void testMultiQueryPhase3StreamingInReducer2() {
+
+ System.out.println("===== multi-query phase 3 with streaming in reducer (2) =====");
+
+ try {
+ myPig.setBatchOn();
+
+ myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+ myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+ myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+ myPig.registerQuery("B = group A3 by $2;");
+ myPig.registerQuery("C = foreach B generate flatten(A3);");
+ myPig.registerQuery("D = stream B through `cat`;");
+ myPig.registerQuery("store D into '/tmp/output1';");
+ myPig.registerQuery("E = group A4 by $2;");
+ myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+ myPig.registerQuery("store F into '/tmp/output2';");
+ myPig.registerQuery("G = group A1 by $2;");
+ myPig.registerQuery("H = foreach G generate group, COUNT(A1);");
+ myPig.registerQuery("store H into '/tmp/output3';");
+
+ myPig.executeBatch();
+
+ } catch (Exception e) {
+ e.printStackTrace();
+ Assert.fail();
+ }
+ }
+
+ @Test
public void testMultiQueryWithPigMixL12() {
System.out.println("===== multi-query with PigMix L12 =====");
@@ -207,7 +543,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23);
- checkMRPlan(pp, 1, 2, 3);
+ checkMRPlan(pp, 1, 1, 1);
} catch (Exception e) {
e.printStackTrace();
@@ -695,7 +1031,7 @@
PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
- checkMRPlan(pp, 1, 2, 3);
+ checkMRPlan(pp, 1, 1, 1);
} catch (Exception e) {
e.printStackTrace();
Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Mon May 4 20:50:31 2009
@@ -26,6 +26,7 @@
import org.apache.pig.FuncSpec;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
@@ -37,9 +38,6 @@
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
import org.apache.pig.impl.plan.PlanException;
@@ -755,6 +753,12 @@
PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag);
return ret;
}
+
+ public static POStore dummyPigStorageOp() {
+ POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+ ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()")));
+ return ret;
+ }
public static POStore topStoreOp() {
POStore ret = new POStore(new OperatorKey("", r.nextLong()));