You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/03/15 04:28:28 UTC

svn commit: r923043 [4/5] - in /hadoop/pig/trunk: src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logical/optimizer/ src/org...

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java Mon Mar 15 03:28:27 2010
@@ -21,17 +21,22 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.util.List;
+
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Add;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Divide;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Mod;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Multiply;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POBinCond;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PONegative;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.PORelationToExprProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POUserFunc;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.Subtract;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
@@ -41,12 +46,14 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
 import org.apache.pig.data.DataType;
 import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.experimental.logical.expression.AddExpression;
+import org.apache.pig.experimental.logical.expression.BagDereferenceExpression;
+import org.apache.pig.experimental.logical.expression.BinCondExpression;
 import org.apache.pig.experimental.logical.expression.DivideExpression;
 import org.apache.pig.experimental.logical.expression.IsNullExpression;
+import org.apache.pig.experimental.logical.expression.LessThanExpression;
 import org.apache.pig.experimental.logical.expression.LogicalExpression;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.ModExpression;
@@ -55,7 +62,7 @@ import org.apache.pig.experimental.logic
 import org.apache.pig.experimental.logical.expression.NotExpression;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
 import org.apache.pig.experimental.logical.expression.SubtractExpression;
-import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
+import org.apache.pig.experimental.logical.expression.UserFuncExpression;
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.experimental.logical.relational.LOFilter;
 import org.apache.pig.experimental.logical.relational.LOForEach;
@@ -66,7 +73,6 @@ import org.apache.pig.experimental.logic
 import org.apache.pig.experimental.logical.relational.LogicalSchema;
 import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
 import org.apache.pig.experimental.plan.OperatorPlan;
-import org.apache.pig.impl.logicalLayer.LOIsNull;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.test.utils.LogicalPlanTester;
@@ -891,10 +897,6 @@ public class TestExperimentalLogToPhyTra
         
         PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
         
-        printPlan(plan);
-        printPlan(newLogicalPlan);
-        printPlan(phyPlan);
-        
         assertEquals(1, ls.getField(0).uid);
         assertEquals(2, ls.getField(1).uid);
         
@@ -933,10 +935,6 @@ public class TestExperimentalLogToPhyTra
         
         PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
         
-        printPlan(plan);
-        printPlan(newLogicalPlan);
-        printPlan(phyPlan);
-        
         assertEquals(1, ls.getField(0).uid);
         assertEquals(2, ls.getField(1).uid);
         
@@ -965,26 +963,390 @@ public class TestExperimentalLogToPhyTra
         assertEquals( ls.getField(0).uid, prj.getUid() );
     }
     
-    public void printPlan(org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan ) throws Exception {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(out);
-        PlanPrinter pp = new PlanPrinter(logicalPlan,ps);
-        pp.visit();
-        System.err.println(out.toString());
-    }
-    
-    public void printPlan(LogicalPlan logicalPlan) throws Exception {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(out);
-        logicalPlan.explain(ps, "text", true);
-        System.err.println(out.toString());
-    }
-    
-    public void printPlan(PhysicalPlan physicalPlan) throws Exception {
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        PrintStream ps = new PrintStream(out);
-        physicalPlan.explain(ps, "text", true);
-        System.err.println(out.toString());
+    public void testPlanwithBinCond() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:int, b:int);");
+        lpt.buildPlan("b = foreach a generate ( a < b ? b : a );");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        LogicalRelationalOperator ld =  (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
+        assertEquals( LOLoad.class, ld.getClass() );
+        LOLoad load = (LOLoad)ld;
+        LogicalSchema ls = load.getSchema();
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
+        assertEquals( POForEach.class, pFE.getClass() );
+        POForEach pForEach = (POForEach)pFE;
+        PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
+        
+        assertEquals(1, ls.getField(0).uid);
+        assertEquals(2, ls.getField(1).uid);
+        
+        LogicalRelationalOperator fe = 
+            (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
+        assertEquals( LOForEach.class, fe.getClass() );
+        LOForEach forEach = (LOForEach)fe;
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = 
+            forEach.getInnerPlan();
+        
+        assertEquals( 1, innerPlan.getSinks().size() );        
+        assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
+        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+        assertEquals( 1, gen.getOutputPlans().size() );
+        LogicalExpressionPlan genExp = gen.getOutputPlans().get(0);
+        
+        assertEquals( 1, genExp.getSources().size() );
+        
+        // Main Tests start here
+        assertEquals( BinCondExpression.class, genExp.getSources().get(0).getClass() );
+        BinCondExpression add = (BinCondExpression) genExp.getSources().get(0);
+        assertEquals( LessThanExpression.class, add.getCondition().getClass() );
+        LessThanExpression lessThan = (LessThanExpression) add.getCondition();
+        assertEquals( ProjectExpression.class, lessThan.getLhs().getClass() );
+        ProjectExpression prj1 = ((ProjectExpression)lessThan.getLhs());
+        ProjectExpression prj2 = ((ProjectExpression)lessThan.getRhs());
+        assertEquals( ls.getField(0).uid, prj1.getUid() );
+        assertEquals( ProjectExpression.class, lessThan.getRhs().getClass() );
+        assertEquals( ls.getField(1).uid, prj2.getUid() );
+        
+        assertEquals( ProjectExpression.class, add.getLhs().getClass() );
+        ProjectExpression prj3 = ((ProjectExpression)add.getLhs());
+        assertEquals( ls.getField(1).uid, prj3.getUid() );
+        assertEquals( ProjectExpression.class, add.getRhs().getClass() );
+        ProjectExpression prj4 = ((ProjectExpression)add.getRhs());
+        assertEquals( ls.getField(0).uid, prj4.getUid() );
+        
+        
+        assertEquals( 4, inputPln.getRoots().size() ); 
+        for( PhysicalOperator p : inputPln.getRoots() ) {
+            assertEquals( POProject.class, p.getClass() );
+        }
+        assertEquals( 1, inputPln.getLeaves().size() );
+        assertEquals( POBinCond.class, inputPln.getLeaves().get(0).getClass() );
+        POBinCond binCond = (POBinCond) inputPln.getLeaves().get(0);
+        assertEquals( POProject.class, binCond.getLhs().getClass() );
+        POProject prj_1 = (POProject)binCond.getLhs();
+        assertEquals( 1, prj_1.getColumn() );
+        assertEquals( POProject.class, binCond.getRhs().getClass() );
+        POProject prj_2 = (POProject) binCond.getRhs();
+        assertEquals( 0, prj_2.getColumn() );
+        assertEquals( LessThanExpr.class, binCond.getCond().getClass() );
+        LessThanExpr lessThan_p = (LessThanExpr) binCond.getCond();
+        
+        assertEquals( POProject.class, lessThan_p.getLhs().getClass() );
+        POProject prj_3 = (POProject) lessThan_p.getLhs();
+        assertEquals( 0, prj_3.getColumn() );
+        assertEquals( POProject.class, lessThan_p.getRhs().getClass() );
+        POProject prj_4 = (POProject) lessThan_p.getRhs();
+        assertEquals( 1, prj_4.getColumn() );
     }
     
+    public void testPlanwithUserFunc() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:int, b:bag{t:tuple(b_a:int,b_b:int)});");
+        lpt.buildPlan("b = foreach a generate a,COUNT(b);");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        LogicalRelationalOperator ld =  (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
+        assertEquals( LOLoad.class, ld.getClass() );
+        LOLoad load = (LOLoad)ld;
+        LogicalSchema ls = load.getSchema();
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
+        assertEquals( POForEach.class, pFE.getClass() );
+        POForEach pForEach = (POForEach)pFE;
+        PhysicalPlan inputPln1 = pForEach.getInputPlans().get(0);
+        
+        assertEquals(1, ls.getField(0).uid);
+        assertEquals(2, ls.getField(1).uid);
+        
+        LogicalRelationalOperator fe = 
+            (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
+        assertEquals( LOForEach.class, fe.getClass() );
+        LOForEach forEach = (LOForEach)fe;
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = 
+            forEach.getInnerPlan();
+        
+        assertEquals( 1, innerPlan.getSinks().size() );        
+        assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
+        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+        assertEquals( 2, gen.getOutputPlans().size() );
+        
+        LogicalExpressionPlan genExp1 = gen.getOutputPlans().get(0);
+        
+        assertEquals( 1, genExp1.getSources().size() );
+        assertEquals( ProjectExpression.class, genExp1.getSources().get(0).getClass() );
+        ProjectExpression prj1  = (ProjectExpression) genExp1.getSources().get(0);
+        assertEquals( ls.getField(0).uid, prj1.getUid() );
+        
+        LogicalExpressionPlan genExp2 = gen.getOutputPlans().get(1);
+        assertEquals( UserFuncExpression.class, genExp2.getSources().get(0).getClass() );
+        assertEquals( ProjectExpression.class, genExp2.getSinks().get(0).getClass() );
+        ProjectExpression prj2 = (ProjectExpression)genExp2.getSinks().get(0);
+        assertEquals( ls.getField(1).uid, prj2.getUid() );
+        
+        assertEquals( 1, inputPln1.getLeaves().size() );
+        assertEquals( 1, inputPln1.getRoots().size() );
+        assertEquals( POProject.class, inputPln1.getLeaves().get(0).getClass() );
+        assertEquals( 0, (( POProject) inputPln1.getLeaves().get(0)).getColumn() );
+        PhysicalPlan inputPln2 = pForEach.getInputPlans().get(1);
+        assertEquals( POUserFunc.class, inputPln2.getLeaves().get(0).getClass() );
+        assertEquals( "org.apache.pig.builtin.COUNT", 
+                ((POUserFunc) inputPln2.getLeaves().get(0)).getFuncSpec().getClassName() );
+        assertEquals( POProject.class, inputPln2.getRoots().get(0).getClass() );
+        assertEquals( 1, ((POProject)inputPln2.getRoots().get(0)).getColumn() );
+        
+    }
+    
+    public void testPlanwithUserFunc2() throws Exception {
+        // This one uses BagDereferenceExpression
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:int, b:bag{t:tuple(b_a:int,b_b:int)});");
+        lpt.buildPlan("b = foreach a generate a,COUNT(b.b_a);");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        LogicalRelationalOperator ld =  (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
+        assertEquals( LOLoad.class, ld.getClass() );
+        LOLoad load = (LOLoad)ld;
+        LogicalSchema ls = load.getSchema();
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        PhysicalOperator pFE = phyPlan.getSuccessors( phyPlan.getRoots().get(0) ).get(0);
+        assertEquals( POForEach.class, pFE.getClass() );
+        POForEach pForEach = (POForEach)pFE;
+        PhysicalPlan inputPln = pForEach.getInputPlans().get(0);
+        
+        assertEquals(1, ls.getField(0).uid);
+        assertEquals(2, ls.getField(1).uid);
+        
+        LogicalRelationalOperator fe = 
+            (LogicalRelationalOperator) newLogicalPlan.getSuccessors(load).get(0);
+        assertEquals( LOForEach.class, fe.getClass() );
+        LOForEach forEach = (LOForEach)fe;
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = 
+            forEach.getInnerPlan();
+        
+        assertEquals( 1, innerPlan.getSinks().size() );        
+        assertEquals( LOGenerate.class, innerPlan.getSinks().get(0).getClass() );
+        LOGenerate gen = (LOGenerate)innerPlan.getSinks().get(0);
+        assertEquals( 2, gen.getOutputPlans().size() );
+        
+        LogicalExpressionPlan genExp1 = gen.getOutputPlans().get(0);
+        
+        assertEquals( 1, genExp1.getSources().size() );
+        assertEquals( ProjectExpression.class, genExp1.getSources().get(0).getClass() );
+        ProjectExpression prj1  = (ProjectExpression) genExp1.getSources().get(0);
+        assertEquals( ls.getField(0).uid, prj1.getUid() );
+        
+        LogicalExpressionPlan genExp2 = gen.getOutputPlans().get(1);
+        assertEquals( UserFuncExpression.class, genExp2.getSources().get(0).getClass() );
+        assertEquals( ProjectExpression.class, genExp2.getSinks().get(0).getClass() );
+        ProjectExpression prj2 = (ProjectExpression)genExp2.getSinks().get(0);
+        assertEquals( ls.getField(1).uid, prj2.getUid() );
+        assertEquals( BagDereferenceExpression.class, genExp2.getPredecessors(prj2).get(0).getClass() );
+        assertEquals( 0, ((BagDereferenceExpression)genExp2.getPredecessors(prj2).get(0)).getBagColNum() );
+        
+        assertEquals( 1, inputPln.getRoots().size() );
+        assertEquals( POProject.class, inputPln.getRoots().get(0).getClass() );
+        assertEquals( 0, ((POProject)inputPln.getRoots().get(0)).getColumn() );
+        
+        PhysicalPlan inputPln2 = pForEach.getInputPlans().get(1);
+        assertEquals( 1, inputPln2.getRoots().size() );
+        assertEquals( POProject.class, inputPln2.getRoots().get(0).getClass() );
+        assertEquals(1, ((POProject)inputPln2.getRoots().get(0)).getColumn() );
+        assertEquals( POUserFunc.class, inputPln2.getLeaves().get(0).getClass() );
+        assertEquals( "org.apache.pig.builtin.COUNT", 
+                ((POUserFunc)inputPln2.getLeaves().get(0)).getFuncSpec().getClassName() );
+        
+        POProject prj3 = (POProject)inputPln2.getRoots().get(0);
+        assertEquals( POProject.class, inputPln2.getSuccessors(prj3).get(0).getClass() );
+        assertEquals( 0, ((POProject)inputPln2.getSuccessors(prj3).get(0)).getColumn() );
+    }    
+    
+    public void testCogroup() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = group a by name;");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals( 1, phyPlan.getRoots().size() );
+        assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
+        POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
+        assertEquals( 1, localR.getInputs().size() );
+        assertEquals( 1, localR.getPlans().size() );
+        PhysicalPlan cogroupPlan = localR.getPlans().get(0);
+        assertEquals( 1, cogroupPlan.getLeaves().size() );        
+        assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
+        POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
+        assertEquals( 0, prj.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj.getResultType() );
+        
+        assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
+        POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
+        assertEquals( DataType.TUPLE, globalR.getResultType() );
+        
+        assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
+        POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
+        assertEquals( DataType.TUPLE, pack.getResultType() );
+    }
+    
+    public void testCogroup2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = group a by ( name, age );");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals( 1, phyPlan.getRoots().size() );
+        assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
+        POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
+        assertEquals( 1, localR.getInputs().size() );
+        assertEquals( 2, localR.getPlans().size() );
+        PhysicalPlan cogroupPlan = localR.getPlans().get(0);
+        assertEquals( 1, cogroupPlan.getLeaves().size() );        
+        assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
+        POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
+        assertEquals( 0, prj.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj.getResultType() );
+        
+        PhysicalPlan cogroupPlan2 = localR.getPlans().get(1);
+        POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
+        assertEquals( 1, prj2.getColumn() );
+        assertEquals( DataType.INTEGER, prj2.getResultType() );
+        
+        assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
+        POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
+        assertEquals( DataType.TUPLE, globalR.getResultType() );
+        
+        assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
+        POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
+        assertEquals( DataType.TUPLE, pack.getResultType() );
+    }
+    
+    public void testCogroup3() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = load 'e.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("c = group a by name, b by name;");
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals( 2, phyPlan.getRoots().size() );
+        assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
+        POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
+        assertEquals( 1, localR.getPlans().size() );
+        PhysicalPlan cogroupPlan = localR.getPlans().get(0);
+        assertEquals( 1, cogroupPlan.getLeaves().size() );        
+        assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
+        POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
+        assertEquals( 0, prj.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj.getResultType() );
+        
+        assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
+        POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
+        assertEquals( DataType.TUPLE, globalR.getResultType() );
+        
+        assertEquals( POLoad.class, phyPlan.getRoots().get(1).getClass() );
+        POLoad load2 = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load2).get(0).getClass() );
+        POLocalRearrange localR2 = (POLocalRearrange)phyPlan.getSuccessors(load2).get(0);
+        assertEquals( 1, localR2.getPlans().size() );
+        PhysicalPlan cogroupPlan2 = localR2.getPlans().get(0);
+        POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
+        assertEquals( 0, prj2.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj2.getResultType() );
+        
+        assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
+        POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
+        assertEquals( DataType.TUPLE, pack.getResultType() );
+    }
+    
+    public void testCogroup4() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = load 'e.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("c = group a by ( name, age ), b by ( name, age );");
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals( 2, phyPlan.getRoots().size() );
+        assertEquals( POLoad.class, phyPlan.getRoots().get(0).getClass() );
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load).get(0).getClass() );
+        POLocalRearrange localR = (POLocalRearrange)phyPlan.getSuccessors(load).get(0);
+        assertEquals( 2, localR.getPlans().size() );
+        PhysicalPlan cogroupPlan = localR.getPlans().get(0);
+        assertEquals( 1, cogroupPlan.getLeaves().size() );        
+        assertEquals( POProject.class, cogroupPlan.getLeaves().get(0).getClass() );
+        POProject prj = (POProject)cogroupPlan.getLeaves().get(0);
+        assertEquals( 0, prj.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj.getResultType() );
+        
+        PhysicalPlan cogroupPlan2 = localR.getPlans().get(1);
+        assertEquals( 1, cogroupPlan2.getLeaves().size() );        
+        assertEquals( POProject.class, cogroupPlan2.getLeaves().get(0).getClass() );
+        POProject prj2 = (POProject)cogroupPlan2.getLeaves().get(0);
+        assertEquals( 1, prj2.getColumn() );
+        assertEquals( DataType.INTEGER, prj2.getResultType() );
+        
+        assertEquals( POGlobalRearrange.class, phyPlan.getSuccessors(localR).get(0).getClass() );
+        POGlobalRearrange globalR = (POGlobalRearrange)phyPlan.getSuccessors(localR).get(0);
+        assertEquals( DataType.TUPLE, globalR.getResultType() );
+        
+        assertEquals( POLoad.class, phyPlan.getRoots().get(1).getClass() );
+        POLoad load2 = (POLoad)phyPlan.getRoots().get(0);
+        
+        assertEquals( POLocalRearrange.class, phyPlan.getSuccessors(load2).get(0).getClass() );
+        
+        POLocalRearrange localR3 = (POLocalRearrange)phyPlan.getSuccessors(load2).get(0);
+        assertEquals( 2, localR3.getPlans().size() );
+        PhysicalPlan cogroupPlan3 = localR3.getPlans().get(0);
+        POProject prj3 = (POProject)cogroupPlan3.getLeaves().get(0);
+        assertEquals( 0, prj3.getColumn() );
+        assertEquals( DataType.CHARARRAY, prj3.getResultType() );
+        
+        PhysicalPlan cogroupPlan4 = localR3.getPlans().get(1);
+        POProject prj4 = (POProject)cogroupPlan4.getLeaves().get(0);
+        assertEquals( 1, prj4.getColumn() );
+        assertEquals( DataType.INTEGER, prj4.getResultType() );
+        
+        assertEquals( POPackage.class, phyPlan.getSuccessors(globalR).get(0).getClass() );
+        POPackage pack = (POPackage)phyPlan.getSuccessors(globalR).get(0);
+        assertEquals( DataType.TUPLE, pack.getResultType() );
+    }
 }

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java?rev=923043&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java Mon Mar 15 03:28:27 2010
@@ -0,0 +1,299 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
+import org.apache.pig.experimental.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.experimental.logical.optimizer.SchemaPatcher;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor;
+import org.apache.pig.experimental.logical.rules.FilterAboveForeach;
+import org.apache.pig.experimental.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.experimental.logical.rules.MapKeysPruneHelper;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.PlanTransformListener;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.TestExperimentalFilterAboveForeach.MyPlanOptimizer;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalPruneMapKeys extends TestCase {
+
+    private PhysicalPlan translatePlan(OperatorPlan plan) throws IOException {
+        LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(plan);
+        visitor.visit();
+        return visitor.getPhysicalPlan();
+    }
+    
+    private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        
+        try {
+            UidStamper stamper = new UidStamper(newPlan);
+            stamper.visit();
+            
+            // run filter rule
+            Set<Rule> s = new HashSet<Rule>();
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            ls.add(s);
+            // Add the PruneMap Filter
+            Rule r = new ColumnMapKeyPrune("PruneMapKeys");
+            s.add(r);            
+            
+            printPlan((org.apache.pig.experimental.logical.relational.LogicalPlan)newPlan);
+            
+            // Run the optimizer
+            MyPlanOptimizer optimizer = new MyPlanOptimizer(newPlan, ls, 3);
+            optimizer.addPlanTransformListener(new ProjectionPatcher());
+            optimizer.addPlanTransformListener(new SchemaPatcher());
+            optimizer.optimize();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);           
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+        
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        assertEquals( 1, newLogicalPlan.getSources().size() );
+        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
+        Map<Long,Set<String>> annotation = 
+            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertTrue( annotation != null );
+        assertEquals( 1, annotation.keySet().size() );
+        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
+        assertEquals( new Integer(0), keySet[0] );
+        Set<String> keys = annotation.get(0);
+        assertEquals( 1, keys.size() );
+        assertEquals( "name", keys.toArray( new String[0] )[0] );            
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan3() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate a#'age',b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        assertEquals( 1, newLogicalPlan.getSources().size() );
+        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
+        Map<Long,Set<String>> annotation = 
+            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertTrue( annotation != null );
+        assertEquals( 1, annotation.keySet().size() );
+        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
+        assertEquals( new Integer(0), keySet[0] );
+        Set<String> keys = annotation.get(0);
+        assertEquals( 2, keys.size() );
+        assertTrue( keys.contains("name") );
+        assertTrue( keys.contains("age"));
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan4() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate a#'age',a,b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan5() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (a:chararray, b:int, c:float);");
+        lpt.buildPlan("b = filter a by a == 'hello';");
+        lpt.buildPlan("c = foreach b generate a,b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan6() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = filter a by $0 == 'hello';");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan7() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("a1 = load 'b.txt' as (a:map[],b:int, c:float);" );
+        lpt.buildPlan("b = join a by $0, a1 by a#'name';");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        printPlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 2, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue( annotation == null || annotation.isEmpty() );
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan8() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("a1 = load 'b.txt' as (a:chararray,b:int, c:float);" );
+        lpt.buildPlan("b = join a by $0, a1 by a;");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        printPlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 2, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue( annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    public void printPlan(org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan ) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        PlanPrinter pp = new PlanPrinter(logicalPlan,ps);
+        pp.visit();
+        System.err.println(out.toString());
+    }
+    
+    public void printPlan(LogicalPlan logicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        logicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+    
+    public void printPlan(PhysicalPlan physicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        physicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=923043&r1=923042&r2=923043&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Mon Mar 15 03:28:27 2010
@@ -19,7 +19,10 @@ package org.apache.pig.test;
 
 
 import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
@@ -32,6 +35,7 @@ import org.apache.pig.experimental.logic
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
 import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOCogroup;
 import org.apache.pig.experimental.logical.relational.LOForEach;
 import org.apache.pig.experimental.logical.relational.LOGenerate;
 import org.apache.pig.experimental.logical.relational.LOInnerLoad;
@@ -40,9 +44,12 @@ import org.apache.pig.experimental.logic
 import org.apache.pig.experimental.logical.relational.LOStore;
 import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
 import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.test.utils.LogicalPlanTester;
 
 import junit.framework.TestCase;
@@ -392,6 +399,354 @@ public class TestLogicalPlanMigrationVis
         assertTrue(schema.getField("d::id")==schema.getField(1));
     }
     
+    public void testCoGroup() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = group a by name;");        
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        LogicalSchema loadSchema = 
+            ((LogicalRelationalOperator)newPlan.getSources().get(0)).getSchema();
+        
+        Set<Long> uids = getAllUids(loadSchema);
+        
+        LogicalRelationalOperator op = (LogicalRelationalOperator) 
+            newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        assertEquals( LOCogroup.class, op.getClass() );
+        LogicalSchema schema = op.getSchema();
+        
+        assertEquals( 2, schema.size() );
+        assertEquals( DataType.CHARARRAY, schema.getField(0).type );
+        assertEquals( false, uids.contains( schema.getField(0).uid ) );
+        assertEquals( 0, schema.getField(0).alias.compareTo("group") );
+        
+        
+        assertEquals( DataType.BAG, schema.getField(1).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(1).schema.getField(0).type );
+        assertEquals( 0, schema.getField(1).schema.getField(0).alias.compareTo("name") );
+        assertEquals( loadSchema.getField(0).uid, schema.getField(1).schema.getField(0).uid );
+        assertEquals( DataType.INTEGER, schema.getField(1).schema.getField(1).type );
+        assertEquals( 0, schema.getField(1).schema.getField(1).alias.compareTo("age") );
+        assertEquals( loadSchema.getField(1).uid, schema.getField(1).schema.getField(1).uid );
+        assertEquals( DataType.FLOAT, schema.getField(1).schema.getField(2).type );
+        assertEquals( 0, schema.getField(1).schema.getField(2).alias.compareTo("gpa") );
+        assertEquals( loadSchema.getField(2).uid, schema.getField(1).schema.getField(2).uid );
+        
+        uids.add(Long.valueOf( schema.getField(0).uid ) );
+        assertEquals( false, uids.contains( schema.getField(1).uid ) );
+        
+        assertEquals( LOCogroup.class, newPlan.getSuccessors(newPlan.getSources().get(0)).get(0).getClass() );
+        LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors(newPlan.getSources().get(0)).get(0);
+        
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans = cogroup.getExpressionPlans();
+        assertEquals( 1, expressionPlans.size() );
+        List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(0));
+        assertEquals( 1, plans.size() );
+        
+        LogicalExpressionPlan exprPlan = plans.get(0);
+        assertEquals( 1, exprPlan.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
+        ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
+        assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+        assertEquals( 0, prj.getColNum() );
+        assertEquals( 0, prj.getInputNum() );
+    }
+    
+    public void testCoGroup2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = group a by ( name, age );");
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+
+        LogicalSchema loadSchema = 
+            ((LogicalRelationalOperator)newPlan.getSources().get(0)).getSchema();
+        
+        Set<Long> uids = getAllUids(loadSchema);
+        
+        LogicalRelationalOperator op = (LogicalRelationalOperator) 
+            newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        assertEquals( LOCogroup.class, op.getClass() );
+        LogicalSchema schema = op.getSchema();
+        
+        assertEquals( 2, schema.size() );
+        assertEquals( DataType.TUPLE, schema.getField(0).type );
+        assertEquals( false, uids.contains( schema.getField(0).uid ) );
+        assertEquals( 0, schema.getField(0).alias.compareTo("group") );
+        assertEquals( DataType.CHARARRAY, schema.getField(0).schema.getField(0).type );        
+        assertEquals( DataType.INTEGER, schema.getField(0).schema.getField(1).type );
+        
+        assertEquals( DataType.BAG, schema.getField(1).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(1).schema.getField(0).type );
+        assertEquals( 0, schema.getField(1).schema.getField(0).alias.compareTo("name") );
+        assertEquals( loadSchema.getField(0).uid, schema.getField(1).schema.getField(0).uid );
+        assertEquals( DataType.INTEGER, schema.getField(1).schema.getField(1).type );
+        assertEquals( 0, schema.getField(1).schema.getField(1).alias.compareTo("age") );
+        assertEquals( loadSchema.getField(1).uid, schema.getField(1).schema.getField(1).uid );
+        assertEquals( DataType.FLOAT, schema.getField(1).schema.getField(2).type );
+        assertEquals( 0, schema.getField(1).schema.getField(2).alias.compareTo("gpa") );
+        assertEquals( loadSchema.getField(2).uid, schema.getField(1).schema.getField(2).uid );
+        
+        
+        // We are doing Uid tests at the end as the uids should not repeat
+        uids.add(Long.valueOf( schema.getField(0).uid ) );
+        assertEquals( false, uids.contains( schema.getField(0).schema.getField(0).uid ) );
+        uids.add( Long.valueOf( schema.getField(0).schema.getField(0).uid ) );
+        assertEquals( false, uids.contains( schema.getField(0).schema.getField(1).uid ) );
+        uids.add( Long.valueOf( schema.getField(0).schema.getField(1).uid ) );        
+        assertEquals( false, uids.contains( schema.getField(1).uid ) );
+        
+        assertEquals( LOCogroup.class, newPlan.getSuccessors(newPlan.getSources().get(0)).get(0).getClass() );
+        LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors(newPlan.getSources().get(0)).get(0);
+        
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans = cogroup.getExpressionPlans();
+        assertEquals( 1, expressionPlans.size() );
+        List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(0));
+        assertEquals( 2, plans.size() );
+        
+        LogicalExpressionPlan exprPlan = plans.get(0);
+        assertEquals( 1, exprPlan.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
+        ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
+        assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+        assertEquals( 0, prj.getColNum() );
+        assertEquals( 0, prj.getInputNum() );
+        
+        LogicalExpressionPlan exprPlan2 = plans.get(1);
+        assertEquals( 1, exprPlan2.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
+        ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
+        assertEquals( loadSchema.getField(1).uid, prj2.getUid() );
+        assertEquals( 1, prj2.getColNum() );
+        assertEquals( 0, prj2.getInputNum() );
+    }
+    
+    public void testCoGroup3() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = load '/test/e.txt' as (name:chararray, blah:chararray );");
+        lpt.buildPlan("c = group a by name, b by name;");
+        LogicalPlan plan = lpt.buildPlan("store c into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        assertEquals( LOCogroup.class, newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0).getClass() );
+        LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        
+        // Reason for this strange way of getting the load schema is to maintain the sequence correctly        
+        LogicalSchema loadSchema = 
+            ((LogicalRelationalOperator)newPlan.getPredecessors(cogroup).get(0)).getSchema();
+        
+        LogicalSchema load2Schema = 
+            ((LogicalRelationalOperator)newPlan.getPredecessors(cogroup).get(1)).getSchema();
+        
+        Set<Long> uids = getAllUids(loadSchema);
+        uids.addAll( getAllUids( load2Schema ) );
+        
+        LogicalRelationalOperator op = (LogicalRelationalOperator) 
+            newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        assertEquals( LOCogroup.class, op.getClass() );
+        LogicalSchema schema = op.getSchema();
+        
+        assertEquals( 3, schema.size() );
+        assertEquals( DataType.CHARARRAY, schema.getField(0).type );
+        assertEquals( false, uids.contains( schema.getField(0).uid ) );
+        assertEquals( 0, schema.getField(0).alias.compareTo("group") );
+
+        assertEquals( DataType.BAG, schema.getField(1).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(1).schema.getField(0).type );
+        assertEquals( 0, schema.getField(1).schema.getField(0).alias.compareTo("name") );
+        assertEquals( loadSchema.getField(0).uid, schema.getField(1).schema.getField(0).uid );
+        assertEquals( DataType.INTEGER, schema.getField(1).schema.getField(1).type );
+        assertEquals( 0, schema.getField(1).schema.getField(1).alias.compareTo("age") );
+        assertEquals( loadSchema.getField(1).uid, schema.getField(1).schema.getField(1).uid );
+        assertEquals( DataType.FLOAT, schema.getField(1).schema.getField(2).type );
+        assertEquals( 0, schema.getField(1).schema.getField(2).alias.compareTo("gpa") );
+        assertEquals( loadSchema.getField(2).uid, schema.getField(1).schema.getField(2).uid );
+        
+        assertEquals( DataType.BAG, schema.getField(2).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(2).schema.getField(0).type );
+        assertEquals( 0, schema.getField(2).schema.getField(0).alias.compareTo("name") );
+        assertEquals( load2Schema.getField(0).uid, schema.getField(2).schema.getField(0).uid );
+        assertEquals( DataType.CHARARRAY, schema.getField(2).schema.getField(1).type );
+        assertEquals( 0, schema.getField(2).schema.getField(1).alias.compareTo("blah") );
+        assertEquals( load2Schema.getField(1).uid, schema.getField(2).schema.getField(1).uid );        
+        
+        
+        // We are doing Uid tests at the end as the uids should not repeat                
+        assertEquals( false, uids.contains( schema.getField(1).uid ) );
+        uids.add( schema.getField(1).uid );
+        assertEquals( false, uids.contains( schema.getField(2).uid) );
+        
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans = cogroup.getExpressionPlans();
+        assertEquals( 2, expressionPlans.size() );
+        List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(0));
+        assertEquals( 1, plans.size() );
+        
+        List<LogicalExpressionPlan> plans2 = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(1));
+        assertEquals( 1, plans2.size() );
+        
+        LogicalExpressionPlan exprPlan = plans.get(0);
+        assertEquals( 1, exprPlan.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
+        ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
+        assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+        assertEquals( 0, prj.getColNum() );
+        assertEquals( 0, prj.getInputNum() );
+        
+        LogicalExpressionPlan exprPlan2 = plans2.get(0);
+        assertEquals( 1, exprPlan2.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
+        ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
+        assertEquals( load2Schema.getField(0).uid, prj2.getUid() );
+        assertEquals( 0, prj2.getColNum() );
+        assertEquals( 1, prj2.getInputNum() );
+    }
+    
+    public void testCoGroup4() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (name:chararray, age:int, gpa:float);");
+        lpt.buildPlan("b = load '/test/e.txt' as (name:chararray, age:int, blah:chararray );");
+        lpt.buildPlan("c = group a by ( name, age ), b by ( name, age );");
+        LogicalPlan plan = lpt.buildPlan("store c into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        assertEquals( LOCogroup.class, newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0).getClass() );
+        LOCogroup cogroup = (LOCogroup) newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        
+        // Reason for this strange way of getting the load schema is to maintain the sequence correctly        
+        LogicalSchema loadSchema = 
+            ((LogicalRelationalOperator)newPlan.getPredecessors(cogroup).get(0)).getSchema();
+        
+        LogicalSchema load2Schema = 
+            ((LogicalRelationalOperator)newPlan.getPredecessors(cogroup).get(1)).getSchema();
+        
+        Set<Long> uids = getAllUids(loadSchema);
+        uids.addAll( getAllUids( load2Schema ) );
+        
+        LogicalRelationalOperator op = (LogicalRelationalOperator) 
+            newPlan.getSuccessors( newPlan.getSources().get(0) ).get(0);
+        assertEquals( LOCogroup.class, op.getClass() );
+        LogicalSchema schema = op.getSchema();
+        
+        assertEquals( 3, schema.size() );
+        assertEquals( DataType.TUPLE, schema.getField(0).type );
+        assertEquals( false, uids.contains( schema.getField(0).uid ) );
+        assertEquals( 0, schema.getField(0).alias.compareTo("group") );
+        assertEquals( DataType.CHARARRAY, schema.getField(0).schema.getField(0).type );
+        assertEquals( 0, schema.getField(0).schema.getField(0).alias.compareTo("name") );
+        assertEquals( DataType.INTEGER, schema.getField(0).schema.getField(1).type );
+        assertEquals( 0, schema.getField(0).schema.getField(1).alias.compareTo("age") );                
+               
+
+        assertEquals( DataType.BAG, schema.getField(1).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(1).schema.getField(0).type );
+        assertEquals( 0, schema.getField(1).schema.getField(0).alias.compareTo("name") );
+        assertEquals( loadSchema.getField(0).uid, schema.getField(1).schema.getField(0).uid );
+        assertEquals( DataType.INTEGER, schema.getField(1).schema.getField(1).type );
+        assertEquals( 0, schema.getField(1).schema.getField(1).alias.compareTo("age") );
+        assertEquals( loadSchema.getField(1).uid, schema.getField(1).schema.getField(1).uid );
+        assertEquals( DataType.FLOAT, schema.getField(1).schema.getField(2).type );
+        assertEquals( 0, schema.getField(1).schema.getField(2).alias.compareTo("gpa") );
+        assertEquals( loadSchema.getField(2).uid, schema.getField(1).schema.getField(2).uid );
+        
+        assertEquals( DataType.BAG, schema.getField(2).type );
+        
+        assertEquals( DataType.CHARARRAY, schema.getField(2).schema.getField(0).type );
+        assertEquals( 0, schema.getField(2).schema.getField(0).alias.compareTo("name") );
+        assertEquals( load2Schema.getField(0).uid, schema.getField(2).schema.getField(0).uid );
+        assertEquals( DataType.INTEGER, schema.getField(2).schema.getField(1).type );
+        assertEquals( 0, schema.getField(2).schema.getField(1).alias.compareTo("age") );
+        assertEquals( load2Schema.getField(1).uid, schema.getField(2).schema.getField(1).uid );
+        assertEquals( DataType.CHARARRAY, schema.getField(2).schema.getField(2).type );
+        assertEquals( 0, schema.getField(2).schema.getField(2).alias.compareTo("blah") );
+        assertEquals( load2Schema.getField(2).uid, schema.getField(2).schema.getField(2).uid );        
+        
+        
+        // We are doing Uid tests at the end as the uids should not repeat
+        assertEquals( false, uids.contains( schema.getField(0).schema.getField(0).uid ) );
+        assertEquals( false, uids.contains( schema.getField(0).schema.getField(1).uid ) );
+        assertEquals( false, uids.contains( schema.getField(1).uid ) );
+        uids.add( schema.getField(1).uid );
+        assertEquals( false, uids.contains( schema.getField(2).uid) );
+        
+        
+        MultiMap<Integer, LogicalExpressionPlan> expressionPlans = cogroup.getExpressionPlans();
+        assertEquals( 2, expressionPlans.size() );
+        List<LogicalExpressionPlan> plans = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(0));
+        assertEquals( 2, plans.size() );
+        
+        List<LogicalExpressionPlan> plans2 = (List<LogicalExpressionPlan>) expressionPlans.get(Integer.valueOf(1));
+        assertEquals( 2, plans2.size() );
+        
+        LogicalExpressionPlan exprPlan = plans.get(0);
+        assertEquals( 1, exprPlan.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan.getSinks().get(0).getClass() );
+        ProjectExpression prj = (ProjectExpression) exprPlan.getSinks().get(0);
+        assertEquals( loadSchema.getField(0).uid, prj.getUid() );
+        assertEquals( 0, prj.getColNum() );
+        assertEquals( 0, prj.getInputNum() );
+        
+        LogicalExpressionPlan exprPlan2 = plans.get(1);
+        assertEquals( 1, exprPlan2.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan2.getSinks().get(0).getClass() );
+        ProjectExpression prj2 = (ProjectExpression) exprPlan2.getSinks().get(0);
+        assertEquals( loadSchema.getField(1).uid, prj2.getUid() );
+        assertEquals( 1, prj2.getColNum() );
+        assertEquals( 0, prj2.getInputNum() );        
+        
+        LogicalExpressionPlan exprPlan3 = plans2.get(0);
+        assertEquals( 1, exprPlan3.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan3.getSinks().get(0).getClass() );
+        ProjectExpression prj3 = (ProjectExpression) exprPlan3.getSinks().get(0);
+        assertEquals( load2Schema.getField(0).uid, prj3.getUid() );
+        assertEquals( 0, prj3.getColNum() );
+        assertEquals( 1, prj3.getInputNum() );
+        
+        LogicalExpressionPlan exprPlan4 = plans2.get(1);
+        assertEquals( 1, exprPlan4.getSinks().size() );
+        assertEquals( ProjectExpression.class, exprPlan4.getSinks().get(0).getClass() );
+        ProjectExpression prj4 = (ProjectExpression) exprPlan4.getSinks().get(0);
+        assertEquals( load2Schema.getField(1).uid, prj4.getUid() );
+        assertEquals( 1, prj4.getColNum() );
+        assertEquals( 1, prj4.getInputNum() );
+    }
+    
+    /**
+     * Obtains all the uids from the schema
+     * @param schema
+     * @return Set of uids from this schema. Its a recursive call
+     */
+    private Set<Long> getAllUids( LogicalSchema schema ) {
+        Set<Long> uids = new HashSet<Long>();
+        
+        if( schema != null ) {
+            for( LogicalFieldSchema fieldSchema : schema.getFields() ) {
+                if( ( fieldSchema.type == DataType.BAG || 
+                        fieldSchema.type == DataType.TUPLE ) &&
+                        fieldSchema.schema != null ) {
+                    uids.addAll( getAllUids( fieldSchema.schema ) );
+                } else {
+                    uids.add( fieldSchema.uid );
+                }
+            }
+        }
+        return uids;
+    }
+    
     private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
         LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);    	
         visitor.visit();