You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/03 18:58:21 UTC

svn commit: r1099123 [6/16] - in /pig/branches/branch-0.9: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestNewPlanPushDownForeachFlatten.java Tue May  3 16:58:19 2011
@@ -18,7 +18,6 @@
 package org.apache.pig.test;
 
 
-import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
@@ -27,14 +26,15 @@ import java.util.Set;
 
 import org.apache.pig.ExecType;
 import org.apache.pig.FilterFunc;
+import org.apache.pig.PigServer;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.test.utils.Identity;
-import org.apache.pig.test.utils.LogicalPlanTester;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.logical.relational.LOCross;
 import org.apache.pig.newplan.logical.relational.LOForEach;
 import org.apache.pig.newplan.logical.relational.LOJoin;
@@ -42,13 +42,9 @@ import org.apache.pig.newplan.logical.re
 import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LOSort;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
-import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.newplan.optimizer.PlanOptimizer;
-import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
 import org.apache.pig.newplan.logical.rules.OptimizerUtils;
 import org.apache.pig.newplan.logical.rules.PushDownForEachFlatten;
-import org.apache.pig.newplan.logical.rules.TypeCastInserter;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -59,11 +55,9 @@ import org.junit.Before;
  */
 public class TestNewPlanPushDownForeachFlatten {
     PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
     
     @Before
     public void tearDown() {
-        planTester.reset();
     }
 
     /**
@@ -84,9 +78,7 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testErrorEmptyInput() throws Exception {
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            new org.apache.pig.impl.logicalLayer.LogicalPlan();
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( "" );
         
         Assert.assertTrue( newLogicalPlan.getOperators().hasNext() ==  false );
     }
@@ -96,22 +88,23 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testErrorNonForeachInput() throws Exception {
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+                       "store A into 'output';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
         List<Operator> nexts = newLogicalPlan.getSuccessors( load );
-        Assert.assertTrue( nexts == null || nexts.size() == 0 );
+        Assert.assertTrue( nexts != null && nexts.size() == 1 );
 }
     
     @Test
     public void testForeachNoFlatten() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, $2;");
-        planTester.buildPlan("C = order B by $0, $1;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, $2;" +
+        "C = order B by $0, $1;" +
+         "D = store C into 'dummy';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -125,9 +118,10 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachNoSuccessors() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("B = foreach A generate flatten($1);");
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+                       "B = foreach A generate flatten($1);" +
+                       "Store B into 'output';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -137,10 +131,11 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachStreaming() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate flatten($1);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = stream B through `" + "pc -l" + "`;");
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate flatten($1);" +
+        "C = stream B through `" + "pc -l" + "`;" +
+        "Store C into 'output';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -150,11 +145,12 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachDistinct() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate flatten($1);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = distinct B;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate flatten($1);" +
+        "C = distinct B;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -164,11 +160,12 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachForeach() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = foreach B generate $0;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten(1);" +        
+        "C = foreach B generate $0;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -185,11 +182,12 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachFilter() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = filter B by $1 < 18;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +        
+        "C = filter B by $1 < 18;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -202,11 +200,13 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachSplitOutput() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("split B into C if $1 < 18, D if $1 >= 18;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "split B into C if $1 < 18, D if $1 >= 18;" +
+        "store C into 'output1';" + 
+        "store D into 'output2';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -219,11 +219,12 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachLimit() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("B = limit B 10;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = limit B 10;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -236,12 +237,13 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachUnion() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("D = union B, C;");        
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference);" +
+        "D = union B, C;" +
+        "store D into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -262,12 +264,13 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachCogroup() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("D = cogroup B by $0, C by $0;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference);" +
+        "D = cogroup B by $0, C by $0;" +
+        "store D into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -288,11 +291,12 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachGroupBy() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = group B by $0;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = group B by $0;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -305,11 +309,11 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = order B by $0, $1;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = order B by $0, $1;" +
+        "D = store C into 'dummy';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -326,11 +330,11 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachSortNegative1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0 + 5, $1, flatten($2);");
-        planTester.buildPlan("C = order B by $0, $1;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0 + 5, $1, flatten($2);" +
+        "C = order B by $0, $1;" +
+         "D = store C into 'dummy';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -348,11 +352,11 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachSortNegative2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:tuple(x,y));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = order B by $0, $3;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan( "D = store C into 'dummy';" );
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:tuple(x,y));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = order B by $0, $3;" +
+        "D = store C into 'dummy';";
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -366,10 +370,12 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachFlattenAddedColumnSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, flatten(1);" +
+        "C = order B by $0, $1;" +
+        "store C into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -383,11 +389,12 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachUDFSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate $0, $1, " + Identity.class.getName() + "($2) ;" +
+        "C = order B by $0, $1;" +
+        "store C into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -401,11 +408,12 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachCastSort() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa);");
-        planTester.buildPlan("B = foreach A generate (chararray)$0, $1, flatten($2);");        
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $0, $1;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa);" +
+        "B = foreach A generate (chararray)$0, $1, flatten($2);" +        
+        "C = order B by $0, $1;" +
+        "store C into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -419,13 +427,14 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        planTester.buildPlan("D = cross B, C;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference);" +
+        "D = cross B, C;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -453,13 +462,14 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachCross1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = cross A, C;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "C = foreach B generate $0, $1, flatten($2);" +
+        "D = cross A, C;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -490,17 +500,21 @@ public class TestNewPlanPushDownForeachF
     // A new rule should optimize this case
     @Test
     public void testForeachCross2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = cross B, D;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = foreach C generate $0, $1, flatten($2);" +
+        "E = cross B, D;" +
+        "F = limit E 10;" +
+        "store F into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         // No optimization about foreach flatten.
-        Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOCross );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator limit = newLogicalPlan.getPredecessors(store).get(0);
+        Operator cross = newLogicalPlan.getPredecessors(limit).get(0);
+        Assert.assertTrue( cross instanceof LOCross );
     }
     
     /**
@@ -509,13 +523,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachFlattenAddedColumnCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten(1);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = cross B, C;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -546,13 +561,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachUDFCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = cross B, C;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -583,13 +599,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachCastCross() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten( $2 );");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = cross B, C;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, (int)$1, flatten( $2 );" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = cross B, C;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -617,13 +634,15 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference);");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference);" +
+        "D = join B by $0, C by $0 using 'replicated';" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -651,13 +670,14 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachFRJoin1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = join A by $0, C by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "C = foreach B generate $0, $1, flatten($2);" +
+        "D = join A by $0, C by $0 using 'replicated';" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -688,17 +708,21 @@ public class TestNewPlanPushDownForeachF
     // A new rule should optimize this case
     @Test
     public void testForeachFRJoin2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = join B by $0, D by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = foreach C generate $0, $1, flatten($2);" +
+        "E = join B by $0, D by $0 using 'replicated';" +
+        "F = limit E 10;" +
+        "store F into 'output';";
         
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         // No optimization about foreach flatten.
-        Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+        Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+        Assert.assertTrue( join instanceof LOJoin );
     }
     
     /**
@@ -707,13 +731,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachFlattenAddedColumnFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten(1);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0 using 'replicated';" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -745,13 +770,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachUDFFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0 using 'replicated';" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -783,13 +809,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachCastFRJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0 using \"replicated\";");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, (int)$1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0 using 'replicated';" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -817,13 +844,14 @@ public class TestNewPlanPushDownForeachF
 
     @Test
     public void testForeachInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -851,13 +879,14 @@ public class TestNewPlanPushDownForeachF
     
     @Test
     public void testForeachInnerJoin1() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("C = foreach B generate $0, $1, flatten($2);");
-        planTester.buildPlan("D = join A by $0, C by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "C = foreach B generate $0, $1, flatten($2);" +
+        "D = join A by $0, C by $0;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -888,17 +917,21 @@ public class TestNewPlanPushDownForeachF
     // A new rule should optimize this case
     @Test
     public void testForeachInnerJoin2() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = foreach C generate $0, $1, flatten($2);");
-        planTester.buildPlan("E = join B by $0, D by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("F = limit E 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = foreach C generate $0, $1, flatten($2);" +
+        "E = join B by $0, D by $0;" +
+        "F = limit E 10;" +
+        "store F into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         // No optimization about foreach flatten.
-        Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+        Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+        Assert.assertTrue( join instanceof LOJoin );
     }
     
     /**
@@ -907,17 +940,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachFlattenAddedColumnInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, $1, flatten(1);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        planTester.setPlan(lp);
-        planTester.setProjectionMap(lp);
-        planTester.rebuildSchema(lp);
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, $1, flatten(1);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -948,13 +978,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachUDFInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, flatten($1), " + Identity.class.getName() + "($2) ;" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -985,13 +1016,14 @@ public class TestNewPlanPushDownForeachF
      */
     @Test
     public void testForeachCastInnerJoin() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));");
-        planTester.buildPlan("B = foreach A generate $0, (int)$1, flatten($2);");
-        planTester.buildPlan("C = load 'anotherfile' as (name, age, preference:(course_name, instructor));");
-        planTester.buildPlan("D = join B by $0, C by $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (name, age, gpa:(letter_grade, point_score));" +
+        "B = foreach A generate $0, (int)$1, flatten($2);" +
+        "C = load 'anotherfile' as (name, age, preference:(course_name, instructor));" +
+        "D = join B by $0, C by $0;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         List<Operator> loads = newLogicalPlan.getSources();
         Assert.assertTrue( loads.size() == 2 );
@@ -1020,26 +1052,31 @@ public class TestNewPlanPushDownForeachF
     // See PIG-1172
     @Test
     public void testForeachJoinRequiredField() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});");
-        planTester.buildPlan("B = FOREACH A generate flatten($0);");
-        planTester.buildPlan("C = load '3.txt' AS (c0, c1);");
-        planTester.buildPlan("D = JOIN B by a1, C by c1;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("E = limit D 10;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (bg:bag{t:tuple(a0,a1)});" +
+        "B = FOREACH A generate flatten($0);" +
+        "C = load '3.txt' AS (c0, c1);" +
+        "D = JOIN B by a1, C by c1;" +
+        "E = limit D 10;" +
+        "store E into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         // No optimization about foreach flatten.
-        Assert.assertTrue( newLogicalPlan.getPredecessors( newLogicalPlan.getSinks().get( 0 ) ).get( 0 ) instanceof LOJoin );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator limit = newLogicalPlan.getPredecessors( store ).get( 0 );
+        Operator join = newLogicalPlan.getPredecessors( limit ).get( 0 );
+        Assert.assertTrue( join instanceof LOJoin );
     }
     
     // See PIG-1374
     @Test
     public void testForeachRequiredField() throws Exception {
-        planTester.buildPlan("A = load 'myfile' as (b{t(a0:chararray,a1:int)});");
-        planTester.buildPlan("B = foreach A generate flatten($0);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("C = order B by $1 desc;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "A = load 'myfile' as (b:bag{t:tuple(a0:chararray,a1:int)});" +
+        "B = foreach A generate flatten($0);" +
+        "C = order B by $1 desc;" +
+        "store C into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
         Operator load = newLogicalPlan.getSources().get( 0 );
         Assert.assertTrue( load instanceof LOLoad );
@@ -1054,14 +1091,16 @@ public class TestNewPlanPushDownForeachF
     // See PIG-1706
     @Test
     public void testForeachWithUserDefinedSchema() throws Exception {
-        planTester.buildPlan("a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});");
-        planTester.buildPlan("b = load '2.txt' as (b0:int, b1);");
-        planTester.buildPlan("c = foreach a generate a0, flatten(a2) as (q1, q2);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = planTester.buildPlan("d = join c by a0, b by b0;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "a = load '1.txt' as (a0:int, a1, a2:bag{t:(i1:int, i2:int)});" +
+        "b = load '2.txt' as (b0:int, b1);" +
+        "c = foreach a generate a0, flatten(a2) as (q1, q2);" +
+        "d = join c by a0, b by b0;" +
+        "store d into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
-        LOForEach foreach = (LOForEach)newLogicalPlan.getSinks().get( 0 );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        LOForEach foreach = (LOForEach)newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(foreach.getSchema().getField(1).alias.equals("q1"));
         Assert.assertTrue(foreach.getSchema().getField(2).alias.equals("q2"));
     }
@@ -1069,14 +1108,16 @@ public class TestNewPlanPushDownForeachF
     // See PIG-1751
     @Test
     public void testForeachWithUserDefinedSchema2() throws Exception {
-        planTester.buildPlan("a = load '1.txt' as (a0:chararray);");
-        planTester.buildPlan("b = load '2.txt' as (b0:chararray);");
-        planTester.buildPlan("c = foreach b generate flatten(STRSPLIT(b0)) as c0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan  lp = planTester.buildPlan("d = join c by (chararray)c0, a by a0;");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        String query = "a = load '1.txt' as (a0:chararray);" +
+        "b = load '2.txt' as (b0:chararray);" +
+        "c = foreach b generate flatten(STRSPLIT(b0)) as c0;" +
+        "d = join c by (chararray)c0, a by a0;" +
+        "store d into 'output';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
         
-        Operator op = newLogicalPlan.getSinks().get( 0 );
+        Operator store = newLogicalPlan.getSinks().get( 0 );
+        Operator op = newLogicalPlan.getPredecessors(store).get(0);
         Assert.assertTrue(op instanceof LOJoin);
     }
 
@@ -1103,18 +1144,13 @@ public class TestNewPlanPushDownForeachF
         }
     }    
 
-    private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+    private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
+    	PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
         return newLogicalPlan;
     }
 
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        return newPlan;
-    }
 }
 

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestOptimizeLimit.java Tue May  3 16:58:19 2011
@@ -23,7 +23,7 @@ import java.io.IOException;
 import java.util.*;
 
 import org.apache.pig.ExecType;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.PigServer;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
@@ -31,17 +31,13 @@ import org.apache.pig.newplan.logical.ru
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
 import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.optimizer.OpLimitOptimizer;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.plan.optimizer.OptimizerException;
-import org.apache.pig.test.TestLogicalOptimizer.LogicalOptimizerDerivative;
-import org.apache.pig.test.utils.LogicalPlanTester;
 
 import junit.framework.Assert;
 
-import org.junit.AfterClass;
-import org.junit.BeforeClass;
+import org.junit.After;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestOptimizeLimit {
@@ -49,15 +45,15 @@ public class TestOptimizeLimit {
     static final int MAX_SIZE = 100000;
     PigContext pc = new PigContext( ExecType.LOCAL, new Properties() );
   
-    LogicalPlanTester planTester = new LogicalPlanTester(pc) ;
+    PigServer pigServer;
     
-    @BeforeClass
-    public static void setup() {
-        
+    @Before
+    public void setup() throws ExecException {
+        pigServer = new PigServer( pc );
     }
     
-    @AfterClass
-    public static void tearDown() {
+    @After
+    public void tearDown() {
         
     }
     
@@ -87,149 +83,123 @@ public class TestOptimizeLimit {
     @Test
     // Merget limit into sort
 	public void testOPLimit1Optimizer() throws Exception {
-	    planTester.buildPlan("A = load 'myfile';");
-	    planTester.buildPlan("B = order A by $0;");
-	    planTester.buildPlan("C = limit B 100;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );  
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    String query = "A = load 'myfile';" + 
+	                   "B = order A by $0;" +
+	                   "C = limit B 100;" +
+                       "store C into 'empty';";  
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan1.dot");
 	}
 
 	@Test
 	// Merge limit into limit
 	public void testOPLimit2Optimizer() throws Exception {
-	    planTester.buildPlan("A = load 'myfile';");
-	    planTester.buildPlan("B = limit A 10;");
-	    planTester.buildPlan("C = limit B 100;");
-	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    String query = "A = load 'myfile';" + 
+	                   "B = limit A 10;" + 
+	                   "C = limit B 100;" + "store C into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan2.dot");
 	}
 
 	@Test
 	// Duplicate limit with two inputs
 	public void testOPLimit3Optimizer() throws Exception {
-	    planTester.buildPlan("A = load 'myfile1';");
-	    planTester.buildPlan("B = load 'myfile2';");
-	    planTester.buildPlan("C = cross A, B;");
-	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    String query = "A = load 'myfile1';" +
+	    "B = load 'myfile2';" +
+	    "C = cross A, B;" +
+	    "D = limit C 100;" + "store D into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan3.dot");
 	}
 
 	@Test
 	// Duplicte limit with one input
 	public void testOPLimit4Optimizer() throws Exception {
-	    planTester.buildPlan("A = load 'myfile1';");
-	    planTester.buildPlan("B = group A by $0;");
-	    planTester.buildPlan("C = foreach B generate flatten(A);");
-	    org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("D = limit C 100;");
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    String query = "A = load 'myfile1';" + 
+	                   "B = group A by $0;" + "C = foreach B generate flatten(A);" + "D = limit C 100;" +
+	                   "store D into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan4.dot");
 	}
 
 	@Test
 	// Move limit up
     public void testOPLimit5Optimizer() throws Exception {
-        planTester.buildPlan("A = load 'myfile1';");
-        planTester.buildPlan("B = foreach A generate $0;");
-        planTester.buildPlan("C = limit B 100;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        String query = "A = load 'myfile1';" + 
+        "B = foreach A generate $0;" +
+        "C = limit B 100;" + "store C into 'empty';" ;
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
         compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan5.dot");
     }
 	
     @Test
     // Multiple LOLimit
 	public void testOPLimit6Optimizer() throws Exception {
-	    planTester.buildPlan("A = load 'myfile';");
-	    planTester.buildPlan("B = limit A 50;");
-	    planTester.buildPlan("C = limit B 20;");
-	    planTester.buildPlan("D = limit C 100;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store D into 'empty';" );
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+	    String query = "A = load 'myfile';" +
+	    "B = limit A 50;" + 
+	    "C = limit B 20;" +
+	    "D = limit C 100;" +  "store D into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
 	    compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan6.dot");
 	}
     
     @Test
     // Limit stay the same for ForEach with a flatten
     public void testOPLimit7Optimizer() throws Exception {
-        planTester.buildPlan("A = load 'myfile1';");
-        planTester.buildPlan("B = foreach A generate flatten($0);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        String query = "A = load 'myfile1';" + 
+        "B = foreach A generate flatten($0);" +
+        "C = limit B 100;" + "store C into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
         compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan7.dot");
     }
     
     @Test
     //Limit in the local mode, need to make sure limit stays after a sort
     public void testOPLimit8Optimizer() throws Exception {
-        planTester.buildPlan("A = load 'myfile';");
-        planTester.buildPlan("B = order A by $0;");
-        planTester.buildPlan("C = limit B 10;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
-        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
+        String query = "A = load 'myfile';" + 
+        "B = order A by $0;" +
+        "C = limit B 10;" + "store C into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
+       compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan8.dot");
         
     }
     
     @Test
     public void testOPLimit9Optimizer() throws Exception {
-        planTester.buildPlan("A = load 'myfile';");
-        planTester.buildPlan("B = order A by $0;");
-        planTester.buildPlan("C = limit B 10;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan( "store C into 'empty';" );
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
-        compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
+        String query = "A = load 'myfile';" + 
+        "B = order A by $0;" +
+        "C = limit B 10;" + "store C into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
+       compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan9.dot");
         
     }
 
     @Test
     //See bug PIG-913
     public void testOPLimit10Optimizer() throws Exception {
-        planTester.buildPlan("A = load 'myfile' AS (s:chararray);");
-        planTester.buildPlan("B = limit A 100;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = GROUP B by $0;");
-	    LogicalPlan newLogicalPlan = migrateAndOptimizePlan(plan);
+        String query = "A = load 'myfile' AS (s:chararray);" +
+        "B = limit A 100;" + "C = GROUP B by $0;" + "store C into 'empty';";
+	    LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);;
+	    optimizePlan(newLogicalPlan);
         compareWithGoldenFile(newLogicalPlan, FILE_BASE_LOCATION + "new-optlimitplan10.dot");
     }
 
-    /**
-     * Test that {@link OpLimitOptimizer} returns false on the check if 
-     * pre-conditions for pushing limit up are not met
-     * @throws Exception
-     */
-    @Test
-    public void testOpLimitOptimizerCheck() throws Exception {
-        planTester.buildPlan("A = load 'myfile';");
-        planTester.buildPlan("B = foreach A generate $0;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("C = limit B 100;");
-        LogicalOptimizerDerivative optimizer = new LogicalOptimizerDerivative(plan);
-        int numIterations = optimizer.optimize();
-        Assert.assertFalse("Checking number of iterations of the optimizer [actual = "
-                + numIterations + ", expected < " + optimizer.getMaxIterations() + 
-                "]", optimizer.getMaxIterations() == numIterations);
-    
-    }
-
-    @Test
-    //Test to ensure that the right exception is thrown
-    public void testErrOpLimitOptimizer() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = new org.apache.pig.impl.logicalLayer.LogicalPlan();
-        OpLimitOptimizer olo = new OpLimitOptimizer(lp);
-        try {
-            olo.transform(lp.getRoots());
-        } catch(Exception e) {
-            Assert.assertTrue(((OptimizerException)e).getErrorCode() == 2052);
-        }
-    }
-    
     @Test
     //See bug PIG-995
     //We shall throw no exception here
     public void testOPLimit11Optimizer() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan plan = planTester.buildPlan("B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;");
-	    migrateAndOptimizePlan(plan);
+    	String query = "B = foreach (limit (order (load 'myfile' AS (a0, a1, a2)) by $1) 10) generate $0;";
+    	LogicalPlan plan = Util.buildLp(pigServer, query);
+	    optimizePlan(plan);
     }
 
 
@@ -258,17 +228,10 @@ public class TestOptimizeLimit {
         }
     }    
 
-    private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
-        LogicalPlan newLogicalPlan = migratePlan( plan );
-        PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
+    private LogicalPlan optimizePlan(LogicalPlan plan) throws IOException {
+        PlanOptimizer optimizer = new MyPlanOptimizer( plan, 3 );
         optimizer.optimize();
-        return newLogicalPlan;
+        return plan;
     }
 
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        return newPlan;
-    }
 }

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterOptimization.java Tue May  3 16:58:19 2011
@@ -1,576 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.pig.test;
-
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Properties;
-import java.util.regex.Pattern;
-
-import junit.framework.TestCase;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.mapreduce.InputFormat;
-import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.mapreduce.RecordReader;
-import org.apache.pig.ExecType;
-import org.apache.pig.Expression;
-import org.apache.pig.LoadFunc;
-import org.apache.pig.LoadMetadata;
-import org.apache.pig.PigServer;
-import org.apache.pig.ResourceSchema;
-import org.apache.pig.ResourceStatistics;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PColFilterExtractor;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.parser.ParseException;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.test.utils.LogicalPlanTester;
-import org.junit.Test;
-
-/**
- * unit tests to test extracting partition filter conditions out of the filter
- * condition in the filter following a load which talks to metadata system (.i.e.
- * implements {@link LoadMetadata})
- */
-public class TestPartitionFilterOptimization extends TestCase {
-
-    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    LogicalPlanTester lpTester;
-    
-    @Override
-    protected void setUp() throws Exception {
-        lpTester = new LogicalPlanTester(pc);
-        lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);");
-    }
-
-    /**
-     * test case where there is a single expression on partition columns in 
-     * the filter expression along with an expression on non partition column
-     * @throws FrontendException 
-     */
-    @Test
-    public void testSimpleMixed() throws FrontendException {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';");
-        test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
-    }
-    
-    /**
-     * test case where filter does not contain any condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testNoPartFilter() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
-        test(lp, Arrays.asList("srcid"), null, 
-                "((age == 20) and (name == 'foo'))");
-    }
-    
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter1() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "((srcid > 20) and (mrkt == 'us'))", null);
-        
-    }
-    
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter2() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "(mrkt == 'us')", null);
-        
-    }
-    
-    /**
-     * test case where filter only contains condition on partition cols
-     * @throws Exception
-     */
-    @Test
-    public void testOnlyPartFilter3() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "((srcid == 20) or (mrkt == 'us'))", null);
-        
-    }
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed1() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-            		"(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
-            		"name == 'foo');");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                "((mrkt == 'us') and (srcid == 10))", 
-                "((age < 20) and (name == 'foo'))");
-    }
-    
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed2() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
-                    "dstid == 15);");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", 
-                "(age >= 20)");
-    }
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns
-     */
-    @Test
-    public void testMixed3() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "age >= 20 and  mrkt == 'us' and srcid == 10;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
-    }
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - this testcase also has a condition
-     * based on comparison of two partition columns
-     */
-    @Test
-    public void testMixed4() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
-                    "srcid == dstid;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and (srcid == dstid))", 
-                "((age >= 20) and (name == 'foo'))");
-    }
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - 
-     * This testcase has two partition col conditions  with OR +  non parition 
-     * col conditions
-     */
-    @Test
-    public void testMixed5() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
-                    "dstid == 30;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", 
-                "(name == 'foo')");
-    }
-    
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns - 
-     * This testcase has two partition col conditions  with OR +  non parition 
-     * col conditions
-     */
-    @Test
-    public void testMixed6() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", 
-                "(name == 'foo')");
-    }
-    /**
-     * test case where filter has both conditions on partition cols and non
-     * partition cols and the filter condition will be split to extract the
-     * conditions on partition columns. This testcase also tests arithmetic
-     * in partition column conditions
-     */
-    @Test
-    public void testMixedArith() throws Exception {
-        LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
-                "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", 
-                "(age != 15)");
-    }
-    
-    @Test
-    public void testNegPColConditionWithNonPCol() throws Exception {
-        // use of partition column condition and non partition column in 
-        // same condition should fail
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                    "srcid > age;");
-        negativeTest(lp, Arrays.asList("srcid"), 1111);
-        lp =  lpTester.buildPlan("b = filter a by " +
-                    "srcid + age == 20;");
-        negativeTest(lp, Arrays.asList("srcid"), 1111);
-
-        // OR of partition column condition and non partiton col condition 
-        // should fail
-        lp = lpTester.buildPlan("b = filter a by " +
-                    "srcid > 10 or name == 'foo';");
-        negativeTest(lp, Arrays.asList("srcid"), 1111);
-    }
-    
-    @Test
-    public void testNegPColInWrongPlaces() throws Exception {
-        
-        int expectedErrCode = 1112;
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-        "(srcid > 10 and name == 'foo') or dstid == 10;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid"), expectedErrCode); 
-        
-        expectedErrCode = 1110;
-        lp = lpTester.buildPlan("b = filter a by " +
-                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-                "mrkt matches '.*us.*' and age < 15;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-                "(int)mrkt == 10 and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt is null) and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt is not null) and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
-    }
-    
-    
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping1() throws Exception {
-        TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
-            + TestLoader.class.getName() + 
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-        		"(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
-        lpTester.typeCheckPlan(lp);
-        lpTester.optimizePlan(lp);
-        assertEquals("checking partition filter:",             
-                    "((mrkt == 'us') and (srcid == 10))",
-                    TestLoader.partFilter.toString());
-        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
-        String actual = PColFilterExtractor.getExpression(
-                (ExpressionOperator) filter.getComparisonPlan().
-                getLeaves().get(0)).
-                toString().toLowerCase();
-        assertEquals("checking trimmed filter expression:", 
-                "((f5 >= 20) and (f3 == 15))", actual);
-    }
-    
-    
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case there is no condition on partition columns
-     * - so setPartitionFilter() should not be called and the filter condition
-     * should remain as is.
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping2() throws Exception {
-        TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
-            + TestLoader.class.getName() + 
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid') as (f1, f2, f3, f4, f5);");
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "f5 >= 20 and f2 == 'us' and f3 == 15;");
-        lpTester.typeCheckPlan(lp);
-        lpTester.optimizePlan(lp);
-        assertEquals("checking partition filter:",             
-                    null,
-                    TestLoader.partFilter);
-        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
-        String actual = PColFilterExtractor.getExpression(
-                (ExpressionOperator) filter.getComparisonPlan().
-                getLeaves().get(0)).
-                toString().toLowerCase();
-        assertEquals("checking trimmed filter expression:", 
-                "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
-    }
-    
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case the filter only has conditions on partition
-     * columns
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping3() throws Exception {
-        TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
-            + TestLoader.class.getName() + 
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
-        lpTester.typeCheckPlan(lp);
-        lpTester.optimizePlan(lp);
-        assertEquals("checking partition filter:",             
-                    "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
-                    "(dstid == 15)))",
-                    TestLoader.partFilter.toString());
-        Iterator<LogicalOperator> it = lp.iterator();
-        assertTrue("Checking that filter has been removed since it contained" +
-        		" only conditions on partition cols:", 
-        		(it.next() instanceof LOLoad));
-        assertFalse("Checking that filter has been removed since it contained" +
-                " only conditions on partition cols:", 
-                it.hasNext());
-        
-    }
-    
-    /**
-     * Test that pig sends correct partition column names in setPartitionFilter
-     * when the user has a schema in the load statement which renames partition
-     * columns - in this test case the schema in load statement is a prefix 
-     * (with columns renamed) of the schema returned by 
-     * {@link LoadMetadata#getSchema(String, Configuration)}
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping4() throws Exception {
-        TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
-            + TestLoader.class.getName() + 
-            "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1, f2, f3);");
-        LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
-        lpTester.typeCheckPlan(lp);
-        lpTester.optimizePlan(lp);
-        assertEquals("checking partition filter:",             
-                    "((mrkt == 'us') and (srcid == 10))",
-                    TestLoader.partFilter.toString());
-        LOFilter filter = (LOFilter) lp.getLeaves().get(0);
-        String actual = PColFilterExtractor.getExpression(
-                (ExpressionOperator) filter.getComparisonPlan().
-                getLeaves().get(0)).
-                toString().toLowerCase();
-        assertEquals("checking trimmed filter expression:", 
-                "((age >= 20) and (f3 == 15))", actual);
-    }
-    
-    /**
-     * Test PIG-1267
-     * @throws Exception
-     */
-    @Test
-    public void testColNameMapping5() throws Exception {
-        TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
-            + TestLoader.class.getName() + 
-            "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
-            "'srcid');");
-        lpTester.buildPlan("b = load 'bar' using "
-                + TestLoader.class.getName() + 
-                "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
-                "'srcid');");
-        lpTester.buildPlan("a1 = filter a by srcid == 10;");
-        lpTester.buildPlan("b1 = filter b by srcid == 20;");
-        lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
-        LogicalPlan lp = lpTester
-                .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
-                		"$5 as dstid:int, $0 as mrkt:chararray;");
-        
-        new PlanSetter(lp).visit();
-        
-        lpTester.typeCheckPlan(lp);
-        lpTester.optimizePlan(lp);
- 
-        assertEquals("checking partition filter:",             
-                    "(srcid == 20)",
-                    TestLoader.partFilter.toString());
-        
-        int counter = 0;
-        Iterator<LogicalOperator> iter = lp.getKeys().values().iterator();
-        while (iter.hasNext()) {
-            assertTrue(!(iter.next() instanceof LOFilter));
-            counter++;
-        }      
-        assertEquals(counter, 6);
-    }
-    
-    //// helper methods ///////
-    
-    private PColFilterExtractor test(LogicalPlan lp, List<String> partitionCols, 
-            String expPartFilterString, String expFilterString) 
-    throws FrontendException {
-        LOFilter filter = (LOFilter)lp.getLeaves().get(0);
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(
-                filter.getComparisonPlan(), partitionCols);
-        pColExtractor.visit();
-        
-        if(expPartFilterString == null) {
-            assertEquals("Checking partition column filter:", null, 
-                    pColExtractor.getPColCondition());
-        } else  {
-            assertEquals("Checking partition column filter:", 
-                    expPartFilterString.toLowerCase(), 
-                    pColExtractor.getPColCondition().toString().toLowerCase());   
-        }
-        
-        if(expFilterString == null) {
-            assertTrue("Check that filter can be removed:", 
-                    pColExtractor.isFilterRemovable());
-        } else {
-            String actual = PColFilterExtractor.getExpression(
-                                (ExpressionOperator) filter.getComparisonPlan().
-                                getLeaves().get(0)).
-                                toString().toLowerCase();
-            assertEquals("checking trimmed filter expression:", expFilterString,
-                    actual);
-        }
-        return pColExtractor;
-    }
-    
-    private void negativeTest(LogicalPlan lp, List<String> partitionCols,
-            int expectedErrorCode) {
-        LOFilter filter = (LOFilter)lp.getLeaves().get(0);
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(
-                filter.getComparisonPlan(), partitionCols);
-        try {
-            pColExtractor.visit();
-        } catch(Exception e) {
-            assertEquals("Checking if exception has right error code", 
-                    expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
-            return;
-        }
-        fail("Exception expected!");
-    }
-    
-    /**
-     * this loader is only used to test that parition column filters are given
-     * in the manner expected in terms of column names - hence it does not
-     * implement many of the methods and only implements required ones.
-     */
-    public static class TestLoader extends LoadFunc implements LoadMetadata {
-
-        Schema schema;
-        String[] partCols;
-        static Expression partFilter = null;
-        
-        public TestLoader(String schemaString, String commaSepPartitionCols) 
-        throws ParseException {
-            schema = Util.getSchemaFromString(schemaString);
-            partCols = commaSepPartitionCols.split(",");
-        }
-        
-        @Override
-        public InputFormat getInputFormat() throws IOException {
-            return null;
-        }
-
-        @Override
-        public Tuple getNext() throws IOException {
-            return null;
-        }
-
-        @Override
-        public void prepareToRead(RecordReader reader, PigSplit split)
-                throws IOException {
-        }
-
-        @Override
-        public void setLocation(String location, Job job) throws IOException {
-        }
-
-        @Override
-        public String[] getPartitionKeys(String location, Job job)
-                throws IOException {
-            return partCols;
-        }
-
-        @Override
-        public ResourceSchema getSchema(String location, Job job)
-                throws IOException {
-            return new ResourceSchema(schema);
-        }
-
-        @Override
-        public ResourceStatistics getStatistics(String location,
-                Job job) throws IOException {
-            return null;
-        }
-
-        @Override
-        public void setPartitionFilter(Expression partitionFilter)
-                throws IOException {
-            partFilter = partitionFilter;            
-        }
-        
-    }
-}