You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ol...@apache.org on 2009/05/04 22:50:32 UTC

svn commit: r771437 [2/2] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src/org/apache/pig/backend/hadoop/executionengin...

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestMultiQuery.java Mon May  4 20:50:31 2009
@@ -181,6 +181,342 @@
     }
     
     @Test
+    public void testMultiQueryPhase3BaseCase() {
+
+        System.out.println("===== multi-query phase 3 base case =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");
+            myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");            
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3BaseCase2() {
+
+        System.out.println("===== multi-query phase 3 base case (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");
+            myPig.registerQuery("d2 = foreach d1 generate group, AVG(d.uid);");            
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithoutCombiner() {
+
+        System.out.println("===== multi-query phase 3 without combiner =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");           
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3WithoutCombiner2() {
+
+        System.out.println("===== multi-query phase 3 without combiner (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid) + SUM(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid) - COUNT(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");           
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }     
+    
+    @Test
+    public void testMultiQueryPhase3WithMixedCombiner() {
+
+        System.out.println("===== multi-query phase 3 with mixed combiner =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");            
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithMixedCombiner2() {
+
+        System.out.println("===== multi-query phase 3 with mixed combiner (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by gid;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by gid;");            
+            myPig.registerQuery("d2 = foreach d1 generate group, MAX(d.uid) - MIN(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3WithDifferentMapDataTypes() {
+
+        System.out.println("===== multi-query phase 3 with different map datatypes =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid parallel 2;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by $1 parallel 3;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by $1 parallel 4;");
+            myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 19);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
+
+            checkMRPlan(pp, 1, 1, 1);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+
+    @Test
+    public void testMultiQueryPhase3WithDifferentMapDataTypes2() {
+
+        System.out.println("===== multi-query phase 3 with different map datatypes (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("a = load 'file:test/org/apache/pig/test/data/passwd' " +
+                                 "using PigStorage(':') as (uname:chararray, passwd:chararray, uid:int, gid:int);");
+            myPig.registerQuery("b = filter a by uid < 5;");
+            myPig.registerQuery("c = filter a by uid >= 5 and uid < 10;");
+            myPig.registerQuery("d = filter a by uid >= 10;");
+            myPig.registerQuery("b1 = group b by gid;");
+            myPig.registerQuery("b2 = foreach b1 generate group, COUNT(b.uid);");
+            myPig.registerQuery("b3 = filter b2 by $1 > 5;");
+            myPig.registerQuery("store b3 into '/tmp/output1';");
+            myPig.registerQuery("c1 = group c by $1;");
+            myPig.registerQuery("c2 = foreach c1 generate group, SUM(c.uid);");
+            myPig.registerQuery("store c2 into '/tmp/output2';");
+            myPig.registerQuery("d1 = group d by $1;");
+            myPig.registerQuery("d2 = foreach d1 generate group, COUNT(d.uid);");
+            myPig.registerQuery("store d2 into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3StreamingInReducer() {
+
+        System.out.println("===== multi-query phase 3 with streaming in reducer =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+            myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+            myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+            myPig.registerQuery("B = group A3 by $2;");
+            myPig.registerQuery("C = foreach B generate flatten(A3);");
+            myPig.registerQuery("D = stream B through `cat`;");
+            myPig.registerQuery("store D into '/tmp/output1';");
+            myPig.registerQuery("E = group A4 by $2;");
+            myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+            myPig.registerQuery("store F into '/tmp/output2';");            
+            myPig.registerQuery("G = group A1 by $2;");
+            myPig.registerQuery("H = foreach G generate group, COUNT(A1);");          
+            myPig.registerQuery("store H into '/tmp/output3';");
+             
+            LogicalPlan lp = checkLogicalPlan(1, 3, 16);
+
+            PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 24);
+
+            checkMRPlan(pp, 1, 1, 2);
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }         
+    
+    @Test
+    public void testMultiQueryPhase3StreamingInReducer2() {
+
+        System.out.println("===== multi-query phase 3 with streaming in reducer (2) =====");
+
+        try {
+            myPig.setBatchOn();
+
+            myPig.registerQuery("A = load 'file:test/org/apache/pig/test/data/passwd' split by 'file';");
+            myPig.registerQuery("Split A into A1 if $2 > 5, A2 if $2 >= 5;");
+            myPig.registerQuery("Split A1 into A3 if $0 > 'm', A4 if $0 >= 'm';");
+            myPig.registerQuery("B = group A3 by $2;");
+            myPig.registerQuery("C = foreach B generate flatten(A3);");
+            myPig.registerQuery("D = stream B through `cat`;");
+            myPig.registerQuery("store D into '/tmp/output1';");
+            myPig.registerQuery("E = group A4 by $2;");
+            myPig.registerQuery("F = foreach E generate group, COUNT(A4);");
+            myPig.registerQuery("store F into '/tmp/output2';");            
+            myPig.registerQuery("G = group A1 by $2;");
+            myPig.registerQuery("H = foreach G generate group, COUNT(A1);");          
+            myPig.registerQuery("store H into '/tmp/output3';");
+             
+            myPig.executeBatch();
+            
+        } catch (Exception e) {
+            e.printStackTrace();
+            Assert.fail();
+        } 
+    }       
+    
+    @Test
     public void testMultiQueryWithPigMixL12() {
 
         System.out.println("===== multi-query with PigMix L12 =====");
@@ -207,7 +543,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 23);
 
-            checkMRPlan(pp, 1, 2, 3); 
+            checkMRPlan(pp, 1, 1, 1); 
             
         } catch (Exception e) {
             e.printStackTrace();
@@ -695,7 +1031,7 @@
 
             PhysicalPlan pp = checkPhysicalPlan(lp, 1, 3, 25);
 
-            checkMRPlan(pp, 1, 2, 3);
+            checkMRPlan(pp, 1, 1, 1);
             
         } catch (Exception e) {
             e.printStackTrace();

Modified: hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java?rev=771437&r1=771436&r2=771437&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/utils/GenPhyOp.java Mon May  4 20:50:31 2009
@@ -26,6 +26,7 @@
 import org.apache.pig.FuncSpec;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.builtin.BinStorage;
+import org.apache.pig.builtin.PigStorage;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
@@ -37,9 +38,6 @@
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.*;
 import org.apache.pig.impl.plan.PlanException;
 
@@ -755,6 +753,12 @@
         PORead ret = new PORead(new OperatorKey("", r.nextLong()), bag);
         return ret;
     }
+    
+    public static POStore dummyPigStorageOp() {
+        POStore ret = new POStore(new OperatorKey("", r.nextLong()));
+        ret.setSFile(new FileSpec("DummyFil", new FuncSpec(PigStorage.class.getName() + "()")));
+        return ret;
+    }
 
     public static POStore topStoreOp() {
         POStore ret = new POStore(new OperatorKey("", r.nextLong()));