You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by xu...@apache.org on 2011/05/03 18:58:21 UTC

svn commit: r1099123 [7/16] - in /pig/branches/branch-0.9: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/impl/ src/org/apache/pig/impl/logi...

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterPushDown.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterPushDown.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterPushDown.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestPartitionFilterPushDown.java Tue May  3 16:58:19 2011
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Properties;
 import java.util.Set;
 
+import junit.framework.AssertionFailedError;
+
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.InputFormat;
@@ -35,16 +37,14 @@ import org.apache.pig.ExecType;
 import org.apache.pig.Expression;
 import org.apache.pig.LoadFunc;
 import org.apache.pig.LoadMetadata;
+import org.apache.pig.PigServer;
 import org.apache.pig.ResourceSchema;
 import org.apache.pig.ResourceStatistics;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigSplit;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.expression.LogicalExpression;
-import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.newplan.logical.relational.LOFilter;
-import org.apache.pig.newplan.logical.relational.LOLoad;
 import org.apache.pig.newplan.logical.relational.LogicalPlan;
 import org.apache.pig.newplan.logical.rules.PartitionFilterOptimizer;
 import org.apache.pig.newplan.logical.rules.LoadTypeCastInserter;
@@ -53,18 +53,12 @@ import org.apache.pig.newplan.OperatorPl
 import org.apache.pig.newplan.OperatorSubPlan;
 import org.apache.pig.newplan.PColFilterExtractor;
 import org.apache.pig.newplan.optimizer.PlanOptimizer;
-import org.apache.pig.newplan.optimizer.PlanTransformListener;
 import org.apache.pig.newplan.optimizer.Rule;
 import org.apache.pig.newplan.optimizer.Transformer;
 import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.LogUtils;
-import org.apache.pig.test.TestPartitionFilterOptimization.TestLoader;
-import org.apache.pig.test.utils.LogicalPlanTester;
 import org.junit.AfterClass;
 import org.junit.Assert;
 import org.junit.BeforeClass;
@@ -77,12 +71,10 @@ import org.junit.Test;
  */
 public class TestPartitionFilterPushDown {
     static PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
-    static LogicalPlanTester lpTester;
-    
+    String query = "a = load 'foo' as (srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int);";
+
     @BeforeClass
     public static void setup() throws Exception {
-        lpTester = new LogicalPlanTester(pc);
-        lpTester.buildPlan("a = load 'foo' as (srcid, mrkt, dstid, name, age);");
     }
 
     @AfterClass
@@ -92,66 +84,61 @@ public class TestPartitionFilterPushDown
     /**
      * test case where there is a single expression on partition columns in 
      * the filter expression along with an expression on non partition column
-     * @throws IOException 
+     * @throws Exception 
      */
     @Test
-    public void testSimpleMixed() throws IOException {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid == 10 and name == 'foo';");
-        test(lp, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
+    public void testSimpleMixed() throws Exception {
+        String q = query + "b = filter a by srcid == 10 and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), "(srcid == 10)", "(name == 'foo')");
     }
-    
+
     /**
      * test case where filter does not contain any condition on partition cols
      * @throws Exception
      */
     @Test
     public void testNoPartFilter() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by age == 20 and name == 'foo';");
-        test(lp, Arrays.asList("srcid"), null, 
-                "((age == 20) and (name == 'foo'))");
+        String q = query + "b = filter a by age == 20 and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid"), null, 
+        "((age == 20) and (name == 'foo'))");
     }
-    
+
     /**
      * test case where filter only contains condition on partition cols
      * @throws Exception
      */
     @Test
     public void testOnlyPartFilter1() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid > 20 and mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "((srcid > 20) and (mrkt == 'us'))", null);
-        
+        String q = query + "b = filter a by srcid > 20 and mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), 
+                "((srcid > 20) and (mrkt == 'us'))", null);
+
     }
-    
+
     /**
      * test case where filter only contains condition on partition cols
      * @throws Exception
      */
     @Test
     public void testOnlyPartFilter2() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "(mrkt == 'us')", null);
-        
+        String q = query + "b = filter a by mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), 
+                "(mrkt == 'us')", null);
+
     }
-    
+
     /**
      * test case where filter only contains condition on partition cols
      * @throws Exception
      */
     @Test
     public void testOnlyPartFilter3() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by srcid == 20 or mrkt == 'us';");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
-                    "((srcid == 20) or (mrkt == 'us'))", null);
-        
+        String q = query + "b = filter a by srcid == 20 or mrkt == 'us';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), 
+                "((srcid == 20) or (mrkt == 'us'))", null);
+
     }
-    
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -159,16 +146,15 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed1() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-            		"(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
-            		"name == 'foo');");
-        test(lp, Arrays.asList("srcid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "(age < 20 and  mrkt == 'us') and (srcid == 10 and " +
+        "name == 'foo');" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "mrkt"), 
                 "((mrkt == 'us') and (srcid == 10))", 
-                "((age < 20) and (name == 'foo'))");
+        "((age < 20) and (name == 'foo'))");
     }
-    
-    
+
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -176,15 +162,14 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed2() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
-                    "dstid == 15);");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "(age >= 20 and  mrkt == 'us') and (srcid == 10 and " +
+        "dstid == 15);" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "((mrkt == 'us') and ((srcid == 10) and (dstid == 15)))", 
-                "(age >= 20)");
+        "(age >= 20)");
     }
-    
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -192,13 +177,12 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed3() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "age >= 20 and  mrkt == 'us' and srcid == 10;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "age >= 20 and  mrkt == 'us' and srcid == 10;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "((mrkt == 'us') and (srcid == 10))", "(age >= 20)");
     }
-    
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -207,15 +191,14 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed4() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
-                    "srcid == dstid;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "age >= 20 and  mrkt == 'us' and name == 'foo' and " +
+        "srcid == dstid;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "((mrkt == 'us') and (srcid == dstid))", 
-                "((age >= 20) and (name == 'foo'))");
+        "((age >= 20) and (name == 'foo'))");
     }
-    
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -225,15 +208,14 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed5() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
-                    "dstid == 30;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "(srcid == 10 or mrkt == 'us') and name == 'foo' and " +
+        "dstid == 30;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "(((srcid == 10) or (mrkt == 'us')) and (dstid == 30))", 
-                "(name == 'foo')");
+        "(name == 'foo')");
     }
-    
+
     /**
      * test case where filter has both conditions on partition cols and non
      * partition cols and the filter condition will be split to extract the
@@ -243,23 +225,20 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixed6() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "dstid == 30 and (srcid == 10 or mrkt == 'us') and name == 'foo';" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "((dstid == 30) and ((srcid == 10) or (mrkt == 'us')))", 
-                "(name == 'foo')");
+        "(name == 'foo')");
     }
     
     @Test
     public void test7() throws Exception {
-        LogicalPlanTester tester = new LogicalPlanTester(pc);
-        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
-                + "('srcid, mrkt, dstid, name, age', 'srcid, name');");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
-                .buildPlan("b = filter a by "
-                        + "(srcid < 20 and age < 30) or (name == 'foo' and age > 40);");
-        LogicalPlan plan = migratePlan(lp);
+        String query = "a = load 'foo' using " + TestLoader.class.getName() + 
+            "('srcid, mrkt, dstid, name, age', 'srcid, name');" +
+            "b = filter a by (srcid < 20 and age < 30) or (name == 'foo' and age > 40);" +
+            "store b into 'output';";
+        LogicalPlan plan = buildPlan(new PigServer(pc), query);
         
         Rule rule = new PartitionFilterOptimizer("test");
         List<OperatorPlan> matches = rule.match(plan);
@@ -279,13 +258,11 @@ public class TestPartitionFilterPushDown
     
     @Test
     public void test8() throws Exception {
-        LogicalPlanTester tester = new LogicalPlanTester(pc);
-        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
-                + "('srcid, mrkt, dstid, name, age', 'srcid,name');");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
-                .buildPlan("b = filter a by "
-                        + "(srcid < 20) or (name == 'foo');");
-        LogicalPlan plan = migratePlan(lp);
+        String query = "a = load 'foo' using " + TestLoader.class.getName() +
+            "('srcid, mrkt, dstid, name, age', 'srcid,name');" +
+            "b = filter a by (srcid < 20) or (name == 'foo');" + 
+            "store b into 'output';";
+        LogicalPlan plan = Util.buildLp(new PigServer(pc), query);
         
         Rule rule = new PartitionFilterOptimizer("test");
         List<OperatorPlan> matches = rule.match(plan);
@@ -298,7 +275,7 @@ public class TestPartitionFilterPushDown
             }
             OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
 
-            Assert.assertTrue(newPlan.getBasePlan().size() == 1);
+            Assert.assertTrue(newPlan.getBasePlan().size() == 3);
         }
   
     }
@@ -312,100 +289,102 @@ public class TestPartitionFilterPushDown
      */
     @Test
     public void testMixedArith() throws Exception {
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = 
-            lpTester.buildPlan("b = filter a by " +
-                    "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;");
-        test(lp, Arrays.asList("srcid", "dstid", "mrkt"), 
+        String q = query + "b = filter a by " +
+        "mrkt == 'us' and srcid * 10 == 150 + 20 and age != 15;" + "store b into 'out';";
+        test(q, Arrays.asList("srcid", "dstid", "mrkt"), 
                 "((mrkt == 'us') and ((srcid * 10) == (150 + 20)))", 
-                "(age != 15)");
+        "(age != 15)");
     }
-    
+
     @Test
     public void testNegPColConditionWithNonPCol() throws Exception {
         // use of partition column condition and non partition column in 
         // same condition should fail
-    	org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                    "srcid > age;");
-        negativeTest(lp, Arrays.asList("srcid"));
-        lp =  lpTester.buildPlan("b = filter a by " +
-                    "srcid + age == 20;");
-        negativeTest(lp, Arrays.asList("srcid"));
+        String q = query + "b = filter a by " +
+        "srcid > age;" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid"), 1111);
+        q = query + "b = filter a by " +
+        "srcid + age == 20;" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid"), 1111);
 
         // OR of partition column condition and non partiton col condition 
         // should fail
-        lp = lpTester.buildPlan("b = filter a by " +
-                    "srcid > 10 or name == 'foo';");
-        negativeTest(lp, Arrays.asList("srcid"));
+        q = query + "b = filter a by " +
+        "srcid > 10 or name == 'foo';" +
+        "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid"), 1111);
     }
-    
+
     @Test
     public void testNegPColInWrongPlaces() throws Exception {
+        int expectedErrCode = 1112;
 
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-        "(srcid > 10 and name == 'foo') or dstid == 10;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid")); 
-
-        lp = lpTester.buildPlan("b = filter a by " +
-                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-                "mrkt matches '.*us.*' and age < 15;");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-                "(int)mrkt == 10 and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt is null) and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-        
-        lp = lpTester.buildPlan("b = filter a by " +
-            "(mrkt is not null) and name matches '.*foo.*';");
-        negativeTest(lp, Arrays.asList("srcid", "dstid", "mrkt"));
-    }
-    
-    @Test
-    public void testNegPColInWrongPlaces2() throws Exception {
-        
-        LogicalPlanTester tester = new LogicalPlanTester(pc);
-        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
-                + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
-        
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
-                .buildPlan("b = filter a by "
-                        + "(srcid > 10 and name == 'foo') or dstid == 10;");
-        negativeTest(lp); 
-        
-        lp = tester.buildPlan("b = filter a by " +
-                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
-        negativeTest(lp);
-        
-        lp = tester.buildPlan("b = filter a by " +
-                "mrkt matches '.*us.*' and age < 15;");
-        negativeTest(lp);
-        
-        lp = tester.buildPlan("b = filter a by " +
-                "(int)mrkt == 10 and name matches '.*foo.*';");
-        negativeTest(lp);
-        
-        lp = tester.buildPlan("b = filter a by " +
-            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
-        negativeTest(lp);
-        
-        lp = tester.buildPlan("b = filter a by " +
-            "(mrkt is null) and name matches '.*foo.*';");
-        negativeTest(lp);
-        
-        lp = tester.buildPlan("b = filter a by " +
-            "(mrkt is not null) and name matches '.*foo.*';");
-        negativeTest(lp);
-    }
+        String q = query + "b = filter a by " +
+        "(srcid > 10 and name == 'foo') or dstid == 10;" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid"), expectedErrCode); 
+
+        expectedErrCode = 1110;
+        q = query + "b = filter a by " +
+        "CONCAT(mrkt, '_10') == 'US_10' and age == 20;" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+        q = query + "b = filter a by " +
+        "mrkt matches '.*us.*' and age < 15;" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+        q = query + "b = filter a by " +
+        "(int)mrkt == 10 and name matches '.*foo.*';" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"),expectedErrCode);
+
+        q = query + "b = filter a by " +
+        "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+        q = query + "b = filter a by " +
+        "(mrkt is null) and name matches '.*foo.*';" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+
+        q = query + "b = filter a by " +
+        "(mrkt is not null) and name matches '.*foo.*';" + "store b into 'out';";
+        negativeTest(q, Arrays.asList("srcid", "dstid", "mrkt"), expectedErrCode);
+    }
+    
+//    @Test
+//    public void testNegPColInWrongPlaces2() throws Exception {
+//        
+//        LogicalPlanTester tester = new LogicalPlanTester(pc);
+//        tester.buildPlan("a = load 'foo' using " + TestLoader.class.getName()
+//                + "('srcid, mrkt, dstid, name, age', 'srcid,dstid,mrkt');");
+//        
+//        org.apache.pig.impl.logicalLayer.LogicalPlan lp = tester
+//                .buildPlan("b = filter a by "
+//                        + "(srcid > 10 and name == 'foo') or dstid == 10;");
+//        negativeTest(lp); 
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//                "CONCAT(mrkt, '_10') == 'US_10' and age == 20;");
+//        negativeTest(lp);
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//                "mrkt matches '.*us.*' and age < 15;");
+//        negativeTest(lp);
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//                "(int)mrkt == 10 and name matches '.*foo.*';");
+//        negativeTest(lp);
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//            "(mrkt == 'us' ? age : age + 10) == 40 and name matches '.*foo.*';");
+//        negativeTest(lp);
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//            "(mrkt is null) and name matches '.*foo.*';");
+//        negativeTest(lp);
+//        
+//        lp = tester.buildPlan("b = filter a by " +
+//            "(mrkt is not null) and name matches '.*foo.*';");
+//        negativeTest(lp);
+//    }
     
     
     /**
@@ -417,19 +396,21 @@ public class TestPartitionFilterPushDown
     @Test
     public void testColNameMapping1() throws Exception {
         TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
+        String q = "a = load 'foo' using "
             + TestLoader.class.getName() + 
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1, f2, f3, f4, f5);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-        		"(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
-        
+            "'srcid,mrkt') as (f1, f2, f3, f4, f5);" +
+            "b = filter a by " +
+            "(f5 >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + 
+            "store b into 'out';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
         Assert.assertEquals("checking partition filter:",             
-                    "((mrkt == 'us') and (srcid == 10))",
-                    TestLoader.partFilter.toString());
-        LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0);
+                "((mrkt == 'us') and (srcid == 10))",
+                TestLoader.partFilter.toString());
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
         
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
         
@@ -439,15 +420,16 @@ public class TestPartitionFilterPushDown
         Assert.assertEquals("checking trimmed filter expression:", 
                 "((f5 >= 20) and (f3 == 15))", actual);
     }
-    
-    private LogicalPlan migrateAndOptimizePlan(org.apache.pig.impl.logicalLayer.LogicalPlan plan) throws IOException {
-        LogicalPlan newLogicalPlan = migratePlan( plan );
+
+    private LogicalPlan migrateAndOptimizePlan(String query) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
         PlanOptimizer optimizer = new MyPlanOptimizer( newLogicalPlan, 3 );
         optimizer.optimize();
         return newLogicalPlan;
     }
-    
-    
+
+
     /**
      * Test that pig sends correct partition column names in setPartitionFilter
      * when the user has a schema in the load statement which renames partition
@@ -459,19 +441,21 @@ public class TestPartitionFilterPushDown
     @Test
     public void testColNameMapping2() throws Exception {
         TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
+        String q = "a = load 'foo' using "
             + TestLoader.class.getName() + 
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid') as (f1, f2, f3, f4, f5);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "f5 >= 20 and f2 == 'us' and f3 == 15;");
+            "'srcid') as (f1, f2, f3, f4, f5);" +
+            "b = filter a by " +
+            "f5 >= 20 and f2 == 'us' and f3 == 15;" +
+            "store b into 'out';";
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
 
         Assert.assertEquals("checking partition filter:",             
-                    null,
-                    TestLoader.partFilter);
-        LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0);
+                null,
+                TestLoader.partFilter);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
         
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
         
@@ -482,7 +466,7 @@ public class TestPartitionFilterPushDown
         Assert.assertEquals("checking trimmed filter expression:", 
                 "(((f5 >= 20) and (f2 == 'us')) and (f3 == 15))", actual);
     }
-    
+
     /**
      * Test that pig sends correct partition column names in setPartitionFilter
      * when the user has a schema in the load statement which renames partition
@@ -493,29 +477,28 @@ public class TestPartitionFilterPushDown
     @Test
     public void testColNameMapping3() throws Exception {
         TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
+        String query = "a = load 'foo' using "
             + TestLoader.class.getName() + 
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);");
+            "'srcid,mrkt,dstid,age') as (f1, f2, f3, f4, f5);" +
+            "b = filter a by " +
+            "(f5 >= 20 or f2 == 'us') and (f1 == 10 and f3 == 15);" + 
+            "store b into 'out';";
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( query );
 
         Assert.assertEquals("checking partition filter:",             
-                    "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
-                    "(dstid == 15)))",
-                    TestLoader.partFilter.toString());
+                "(((age >= 20) or (mrkt == 'us')) and ((srcid == 10) and " +
+                "(dstid == 15)))",
+                TestLoader.partFilter.toString());
         Iterator<Operator> it = newLogicalPlan.getOperators();
-        Assert.assertTrue("Checking that filter has been removed since it contained" +
-        		" only conditions on partition cols:", 
-        		(it.next() instanceof LOLoad));
-        Assert.assertFalse("Checking that filter has been removed since it contained" +
-                " only conditions on partition cols:", 
-                it.hasNext());
-        
+        while( it.hasNext() ) {
+	        Assert.assertFalse("Checking that filter has been removed since it contained" +
+	                " only conditions on partition cols:", 
+	                (it.next() instanceof LOFilter));
+        }
     }
-    
+
     /**
      * Test that pig sends correct partition column names in setPartitionFilter
      * when the user has a schema in the load statement which renames partition
@@ -527,19 +510,20 @@ public class TestPartitionFilterPushDown
     @Test
     public void testColNameMapping4() throws Exception {
         TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
+        String q = "a = load 'foo' using "
             + TestLoader.class.getName() + 
             "('srcid:int, mrkt:chararray, dstid:int, name:chararray, age:int', " +
-            "'srcid,mrkt') as (f1, f2, f3);");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester.buildPlan("b = filter a by " +
-                "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);");
+            "'srcid,mrkt') as (f1:int, f2:chararray, f3:int, name:chararray, age:int);" +
+            "b = filter a by " +
+            "(age >= 20 and f2 == 'us') and (f1 == 10 and f3 == 15);" + "store b into 'out';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
 
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
-        
         Assert.assertEquals("checking partition filter:",             
-                    "((mrkt == 'us') and (srcid == 10))",
-                    TestLoader.partFilter.toString());
-        LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0);
+                "((mrkt == 'us') and (srcid == 10))",
+                TestLoader.partFilter.toString());
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
         
         PColFilterExtractor extractor = new PColFilterExtractor(filter.getFilterPlan(), new ArrayList<String>());
         
@@ -549,7 +533,7 @@ public class TestPartitionFilterPushDown
         Assert.assertEquals("checking trimmed filter expression:", 
                 "((age >= 20) and (f3 == 15))", actual);
     }
-    
+
     /**
      * Test PIG-1267
      * @throws Exception
@@ -557,109 +541,87 @@ public class TestPartitionFilterPushDown
     @Test
     public void testColNameMapping5() throws Exception {
         TestLoader.partFilter = null;
-        lpTester.buildPlan("a = load 'foo' using "
+        String q = "a = load 'foo' using "
             + TestLoader.class.getName() + 
             "('mrkt:chararray, a1:chararray, a2:chararray, srcid:int, bcookie:chararray', " +
-            "'srcid');");
-        lpTester.buildPlan("b = load 'bar' using "
-                + TestLoader.class.getName() + 
-                "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
-                "'srcid');");
-        lpTester.buildPlan("a1 = filter a by srcid == 10;");
-        lpTester.buildPlan("b1 = filter b by srcid == 20;");
-        lpTester.buildPlan("c = join a1 by bcookie, b1 by bcookie;");
-        org.apache.pig.impl.logicalLayer.LogicalPlan lp = lpTester
-                .buildPlan("d = foreach c generate $4 as bcookie:chararray, " +
-                		"$5 as dstid:int, $0 as mrkt:chararray;");
-        
-        new PlanSetter(lp).visit();
-        
-        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( lp );
-        
+            "'srcid');" +
+            "b = load 'bar' using "
+            + TestLoader.class.getName() + 
+            "('dstid:int, b1:int, b2:int, srcid:int, bcookie:chararray, mrkt:chararray'," +
+            "'srcid');" +
+            "a1 = filter a by srcid == 10;" +
+            "b1 = filter b by srcid == 20;"+
+            "c = join a1 by bcookie, b1 by bcookie;" +
+            "d = foreach c generate $4 as bcookie:chararray, " +
+            "$5 as dstid:int, $0 as mrkt:chararray;" +
+            "store d into 'out';";
+
+        LogicalPlan newLogicalPlan = migrateAndOptimizePlan( q );
+
         String partFilter = TestLoader.partFilter.toString();
         Assert.assertTrue( "(srcid == 20)".equals( partFilter ) ||  "(srcid == 10)".equals( partFilter ) );
-        
+
         int counter = 0;
         Iterator<Operator> iter = newLogicalPlan.getOperators();
         while (iter.hasNext()) {
-        	 Assert.assertTrue(!(iter.next() instanceof LOFilter));
+            Assert.assertTrue(!(iter.next() instanceof LOFilter));
             counter++;
         }      
-        Assert.assertEquals(counter, 6);
+        Assert.assertEquals(counter, 7);
     }
-    
+
     //// helper methods ///////
-    
-    private PColFilterExtractor test(org.apache.pig.impl.logicalLayer.LogicalPlan lp, List<String> partitionCols, 
+
+    private PColFilterExtractor test(String query, List<String> partitionCols, 
             String expPartFilterString, String expFilterString) 
-    throws IOException {
-    	LogicalPlan newLogicalPlan = migratePlan( lp );
-        LOFilter filter = (LOFilter)newLogicalPlan.getSinks().get(0);
+    throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
         PColFilterExtractor pColExtractor = new PColFilterExtractor(
                 filter.getFilterPlan(), partitionCols);
         pColExtractor.visit();
-        
+
         if(expPartFilterString == null) {
-        	 Assert.assertEquals("Checking partition column filter:", null, 
+            Assert.assertEquals("Checking partition column filter:", null, 
                     pColExtractor.getPColCondition());
         } else  {
-        	 Assert.assertEquals("Checking partition column filter:", 
+            Assert.assertEquals("Checking partition column filter:", 
                     expPartFilterString.toLowerCase(), 
                     pColExtractor.getPColCondition().toString().toLowerCase());   
         }
-        
+
         if(expFilterString == null) {
-        	 Assert.assertTrue("Check that filter can be removed:", 
+            Assert.assertTrue("Check that filter can be removed:", 
                     pColExtractor.isFilterRemovable());
         } else {
             String actual = pColExtractor.getExpression(
-                                (LogicalExpression)filter.getFilterPlan().getSources().get(0)).
-                                toString().toLowerCase();
+                    (LogicalExpression)filter.getFilterPlan().getSources().get(0)).
+                    toString().toLowerCase();
             Assert.assertEquals("checking trimmed filter expression:", expFilterString,
                     actual);
         }
         return pColExtractor;
     }
-    
-    private void negativeTest(org.apache.pig.impl.logicalLayer.LogicalPlan lp,
-            List<String> partitionCols) throws FrontendException {
-        LogicalPlan newLogicalPlan = migratePlan(lp);
-        LOFilter filter = (LOFilter) newLogicalPlan.getSinks().get(0);
-
-        LogicalExpressionPlan plan = filter.getFilterPlan();
-
-        PColFilterExtractor pColExtractor = new PColFilterExtractor(plan,
-                partitionCols);
 
-        pColExtractor.visit();
-
-        Assert.assertTrue(!pColExtractor.canPushDown());
-    }
-    
-    private void negativeTest(org.apache.pig.impl.logicalLayer.LogicalPlan lp)
-            throws FrontendException {
-        LogicalPlan plan = migratePlan(lp);
- 
-        LOFilter filter = (LOFilter) plan.getSinks().get(0);
-        
-        Rule rule = new PartitionFilterOptimizer("test");
-        List<OperatorPlan> matches = rule.match(plan);
-        if (matches != null) {
-            Transformer transformer = rule.getNewTransformer();
-            for (OperatorPlan m : matches) {
-                if (transformer.check(m)) {
-                    transformer.transform(m);
-                }
-            }
-            OperatorSubPlan newPlan = (OperatorSubPlan)transformer.reportChanges();
-
-            LOFilter filter2 = (LOFilter) newPlan.getBasePlan().getSinks().get(0);
-            
-            Assert.assertTrue(filter.isEqual(filter2));
-            Assert.assertTrue(newPlan.getBasePlan().isEqual(plan));
+    private void negativeTest(String query, List<String> partitionCols,
+            int expectedErrorCode) throws Exception {
+        PigServer pigServer = new PigServer( pc );
+        LogicalPlan newLogicalPlan = Util.buildLp(pigServer, query);
+        Operator op = newLogicalPlan.getSinks().get(0);
+        LOFilter filter = (LOFilter)newLogicalPlan.getPredecessors(op).get(0);
+        PColFilterExtractor pColExtractor = new PColFilterExtractor(
+                filter.getFilterPlan(), partitionCols);
+        try {
+            pColExtractor.visit();
+        } catch(Exception e) {
+            Assert.assertEquals("Checking if exception has right error code", 
+                    expectedErrorCode, LogUtils.getPigException(e).getErrorCode());
+            return;
         }
     }
-    
+
     /**
      * this loader is only used to test that parition column filters are given
      * in the manner expected in terms of column names - hence it does not
@@ -670,13 +632,13 @@ public class TestPartitionFilterPushDown
         Schema schema;
         String[] partCols;
         static Expression partFilter = null;
-        
+
         public TestLoader(String schemaString, String commaSepPartitionCols) 
         throws ParseException {
             schema = Util.getSchemaFromString(schemaString);
             partCols = commaSepPartitionCols.split(",");
         }
-        
+
         @Override
         public InputFormat getInputFormat() throws IOException {
             return null;
@@ -689,7 +651,7 @@ public class TestPartitionFilterPushDown
 
         @Override
         public void prepareToRead(RecordReader reader, PigSplit split)
-                throws IOException {
+        throws IOException {
         }
 
         @Override
@@ -698,13 +660,13 @@ public class TestPartitionFilterPushDown
 
         @Override
         public String[] getPartitionKeys(String location, Job job)
-                throws IOException {
+        throws IOException {
             return partCols;
         }
 
         @Override
         public ResourceSchema getSchema(String location, Job job)
-                throws IOException {
+        throws IOException {
             return new ResourceSchema(schema);
         }
 
@@ -716,40 +678,42 @@ public class TestPartitionFilterPushDown
 
         @Override
         public void setPartitionFilter(Expression partitionFilter)
-                throws IOException {
+        throws IOException {
             partFilter = partitionFilter;            
         }
-        
+
     }
 
     public class MyPlanOptimizer extends LogicalPlanOptimizer {
         protected MyPlanOptimizer(OperatorPlan p,  int iterations) {
             super( p, iterations, new HashSet<String>() );
         }
-        
+
         protected List<Set<Rule>> buildRuleSets() {            
             List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
-            
+
             Set<Rule> s = new HashSet<Rule>();
             // add split filter rule
             Rule r = new PartitionFilterOptimizer("PartitionFilterPushDown");
             s = new HashSet<Rule>();
             s.add(r);            
             ls.add(s);
-            
+
             r = new LoadTypeCastInserter( "LoadTypeCastInserter" );
             s = new HashSet<Rule>();
             s.add(r);
             ls.add(s);
             return ls;
         }
-    }    
+    }
 
-    private LogicalPlan migratePlan(org.apache.pig.impl.logicalLayer.LogicalPlan lp) throws VisitorException{
-        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
-        visitor.visit();
-        LogicalPlan newPlan = visitor.getNewLogicalPlan();
-        return newPlan;
+    // Helper Functions
+    public LogicalPlan buildPlan(PigServer pigServer, String query) throws Exception {
+    	try {
+            return Util.buildLp(pigServer, query);
+    	} catch(Throwable t) {
+    		throw new AssertionFailedError(t.getMessage());
+    	}
     }
-    
+
 }

Modified: pig/branches/branch-0.9/test/org/apache/pig/test/TestPigScriptParser.java
URL: http://svn.apache.org/viewvc/pig/branches/branch-0.9/test/org/apache/pig/test/TestPigScriptParser.java?rev=1099123&r1=1099122&r2=1099123&view=diff
==============================================================================
--- pig/branches/branch-0.9/test/org/apache/pig/test/TestPigScriptParser.java (original)
+++ pig/branches/branch-0.9/test/org/apache/pig/test/TestPigScriptParser.java Tue May  3 16:58:19 2011
@@ -36,8 +36,13 @@ import org.apache.pig.ExecType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.logicalLayer.* ;
 import org.apache.pig.impl.logicalLayer.parser.* ;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
 
 public class TestPigScriptParser extends TestCase {
 
@@ -45,37 +50,30 @@ public class TestPigScriptParser extends
     public void testParserWithEscapeCharacters() throws Exception {
 
         // All the needed variables
-        Map<LogicalOperator, LogicalPlan> aliases = new HashMap<LogicalOperator, LogicalPlan>();
-        Map<OperatorKey, LogicalOperator> opTable = new HashMap<OperatorKey, LogicalOperator>() ;
-        Map<String, LogicalOperator> aliasOp = new HashMap<String, LogicalOperator>() ;
-        Map<String, String> fileNameMap = new HashMap<String, String>();
         PigContext pigContext = new PigContext(ExecType.LOCAL, new Properties()) ;
+        PigServer pigServer = new PigServer( pigContext );
         pigContext.connect();
         
         String tempFile = this.prepareTempFile() ;
         
+    	String query = String.format("A = LOAD '%s' ;", Util.encodeEscape(tempFile)) ;
         // Start the real parsing job
         {
-
         	// Initial statement
-        	String query = String.format("A = LOAD '%s' ;", Util.encodeEscape(tempFile)) ;
-        	ByteArrayInputStream in = new ByteArrayInputStream(query.getBytes()); 
-        	QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp, fileNameMap) ;
-        	LogicalPlan lp = parser.Parse() ; 
+        	Util.buildLp(pigServer, query); 
         }
         
         {
         	// Normal condition
-        	String query = "B1 = filter A by $0 eq 'This is a test string' ;" ;
-        	checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
-        	                        query, "This is a test string") ;	
+        	String q = query + "B = filter A by $0 eq 'This is a test string' ;" ;
+        	checkParsedConstContent(pigServer, pigContext, q, "This is a test string") ;	
         }
         
         {
         	// single-quote condition
-        	String query = "B2 = filter A by $0 eq 'This is a test \\'string' ;" ;
-        	checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
-        	                        query, "This is a test 'string") ;	
+        	String q = query + "B = filter A by $0 eq 'This is a test \\'string' ;" ;
+        	checkParsedConstContent(pigServer, pigContext,
+        	                        q, "This is a test 'string") ;	
         }
         
         {
@@ -84,23 +82,23 @@ public class TestPigScriptParser extends
             // since this is to be represented in a Java String, we escape each backslash with one more
             // backslash - hence 4. In a pig script in a file, this would be
             // \\.string
-            String query = "B2 = filter A by $0 eq 'This is a test \\\\.string' ;" ;
-            checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
-                                    query, "This is a test \\.string") ;  
+            String q = query + "B = filter A by $0 eq 'This is a test \\\\.string' ;" ;
+            checkParsedConstContent(pigServer, pigContext,
+                                    q, "This is a test \\.string") ;  
         }
         
         {
         	// newline condition
-        	String query = "B3 = filter A by $0 eq 'This is a test \\nstring' ;" ;
-        	checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap, 
-        	                        query, "This is a test \nstring") ;	
+        	String q = query + "B = filter A by $0 eq 'This is a test \\nstring' ;" ;
+        	checkParsedConstContent(pigServer, pigContext, 
+        	                        q, "This is a test \nstring") ;	
         }
         
         {
         	// Unicode
-        	String query = "B4 = filter A by $0 eq 'This is a test \\uD30C\\uC774string' ;" ;
-        	checkParsedConstContent(aliases, opTable, pigContext, aliasOp, fileNameMap,
-        	                        query, "This is a test \uD30C\uC774string") ;	
+        	String q = query + "B = filter A by $0 eq 'This is a test \\uD30C\\uC774string' ;" ;
+        	checkParsedConstContent(pigServer, pigContext,
+        	                        q, "This is a test \uD30C\uC774string") ;	
         }
     }
     
@@ -135,38 +133,31 @@ public class TestPigScriptParser extends
         }
     }
     
-	private void checkParsedConstContent(Map<LogicalOperator, LogicalPlan> aliases,
-                                             Map<OperatorKey, LogicalOperator> opTable,
+	private void checkParsedConstContent(PigServer pigServer,
                                              PigContext pigContext,
-                                             Map<String, LogicalOperator> aliasOp,
-                                             Map<String, String> fileNameMap,
                                              String query,
                                              String expectedContent)
                                         throws Exception {
-        // Run the parser
-        ByteArrayInputStream in = new ByteArrayInputStream(query.getBytes()); 
-        QueryParser parser = new QueryParser(in, pigContext, "scope", aliases, opTable, aliasOp,
-                                             fileNameMap) ;
-        LogicalPlan lp = parser.Parse() ; 
-        
+        pigContext.connect();
+        LogicalPlan lp = Util.buildLp(pigServer, query + "store B into 'output';");
         // Digging down the tree
-        LogicalOperator root = lp.getRoots().get(0) ;
-        LogicalOperator filter = lp.getSuccessors(root).get(0);
-        LogicalPlan comparisonPlan = ((LOFilter)filter).getComparisonPlan();
-        List<LogicalOperator> comparisonPlanRoots = comparisonPlan.getRoots();
-        LogicalOperator compRootOne = comparisonPlanRoots.get(0);
-        LogicalOperator compRootTwo = comparisonPlanRoots.get(1);
+        Operator load = lp.getSources().get(0);
+        Operator filter = lp.getSuccessors( load ).get(0);
+        LogicalExpressionPlan comparisonPlan = ((LOFilter)filter).getFilterPlan();
+        List<Operator> comparisonPlanRoots = comparisonPlan.getSinks();
+        Operator compRootOne = comparisonPlanRoots.get(0);
+        Operator compRootTwo = comparisonPlanRoots.get(1);
 
         
         // Here is the actual check logic
-        if (compRootOne instanceof LOConst) {
+        if (compRootOne instanceof ConstantExpression) {
             assertTrue("Must be equal", 
-                        ((String)((LOConst)compRootOne).getValue()).equals(expectedContent)) ;
+                        ((String)((ConstantExpression)compRootOne).getValue()).equals(expectedContent)) ;
         } 
         // If not left, it must be right.
         else {
             assertTrue("Must be equal", 
-                        ((String)((LOConst)compRootTwo).getValue()).equals(expectedContent)) ;
+                        ((String)((ConstantExpression)compRootTwo).getValue()).equals(expectedContent)) ;
         }
     }