You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC

svn commit: r909165 [6/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java Thu Feb 11 22:12:36 2010
@@ -22,6 +22,7 @@
 import java.util.Collection;
 import java.util.List;
 
+import org.apache.pig.FuncSpec;
 import org.apache.pig.data.DataType;
 import org.apache.pig.experimental.logical.expression.AndExpression;
 import org.apache.pig.experimental.logical.expression.ConstantExpression;
@@ -29,10 +30,14 @@
 import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
 import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
 import org.apache.pig.experimental.logical.relational.LOLoad;
 import org.apache.pig.experimental.logical.relational.LogicalPlan;
 import org.apache.pig.experimental.logical.relational.LogicalPlanVisitor;
 import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE;
 import org.apache.pig.experimental.plan.BaseOperatorPlan;
 import org.apache.pig.experimental.plan.DependencyOrderWalker;
 import org.apache.pig.experimental.plan.DepthFirstWalker;
@@ -42,6 +47,8 @@
 import org.apache.pig.experimental.plan.PlanVisitor;
 import org.apache.pig.experimental.plan.PlanWalker;
 import org.apache.pig.experimental.plan.ReverseDependencyOrderWalker;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
 import org.junit.Test;
 
@@ -54,7 +61,6 @@
         SillyPlan() {
             super();
         }
-
     }
     
     private static class SillyOperator extends Operator {
@@ -64,10 +70,6 @@
             super(n, p);
             name = n;
         }
-        
-        public boolean equals(SillyOperator other) {
-            return other.name == name;
-        }
 
         @Override
         public void accept(PlanVisitor v) {
@@ -75,6 +77,11 @@
                 ((SillyVisitor)v).visitSillyOperator(this);
             }
         }
+
+        @Override
+        public boolean isEqual(Operator operator) {
+            return ( name.compareTo(operator.getName()) == 0 );
+        }
     }
     
     private static class SillyVisitor extends PlanVisitor {
@@ -173,9 +180,9 @@
         
         // Test that roots and leaves are empty when there are no operators in
         // plan.
-        List<Operator> list = plan.getRoots();
+        List<Operator> list = plan.getSources();
         assertEquals(0, list.size());
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(0, list.size());
         
         plan.add(fred);
@@ -185,9 +192,9 @@
         plan.add(sam);
         
         // Test that when not connected all nodes are roots and leaves.
-        list = plan.getRoots();
+        list = plan.getSources();
         assertEquals(5, list.size());
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(5, list.size());
         
         // Connect them up
@@ -197,15 +204,15 @@
         plan.connect(bob, sam);
         
         // Check that the roots and leaves came out right
-        list = plan.getRoots();
+        list = plan.getSources();
         assertEquals(2, list.size());
         for (Operator op : list) {
-            assertTrue(fred.equals(op) || joe.equals(op));
+            assertTrue(fred.isEqual(op) || joe.isEqual(op));
         }
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(2, list.size());
         for (Operator op : list) {
-            assertTrue(jim.equals(op) || sam.equals(op));
+            assertTrue(jim.isEqual(op) || sam.isEqual(op));
         }
         
         // Check each of their successors and predecessors
@@ -243,15 +250,15 @@
         plan.connect(jim, p2.first, bob, p2.second);
         
          // Check that the roots and leaves came out right
-        list = plan.getRoots();
+        list = plan.getSources();
         assertEquals(2, list.size());
         for (Operator op : list) {
-            assertTrue(jim.equals(op) || joe.equals(op));
+            assertTrue(jim.isEqual(op) || joe.isEqual(op));
         }
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(2, list.size());
         for (Operator op : list) {
-            assertTrue(fred.equals(op) || sam.equals(op));
+            assertTrue(fred.isEqual(op) || sam.isEqual(op));
         }
         
         // Check each of their successors and predecessors
@@ -299,9 +306,9 @@
         plan.remove(bob);
         plan.disconnect(fred, joe);
         
-        List<Operator> list = plan.getRoots();
+        List<Operator> list = plan.getSources();
         assertEquals(2, list.size());
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(2, list.size());
         
         plan.remove(fred);
@@ -309,9 +316,9 @@
         
         assertEquals(0, plan.size());
         
-        list = plan.getRoots();
+        list = plan.getSources();
         assertEquals(0, list.size());
-        list = plan.getLeaves();
+        list = plan.getSinks();
         assertEquals(0, list.size());
     }
     
@@ -744,5 +751,751 @@
         v.visit();
         assertEquals("and equal project constant constant ", v.getVisitPlan());
     }
+    
+    @Test
+    public void testExpressionEquality() {
+        LogicalExpressionPlan ep1 = new LogicalExpressionPlan();
+        ConstantExpression c1 = new ConstantExpression(ep1, DataType.INTEGER, new Integer(5));
+        ProjectExpression p1 = new ProjectExpression(ep1, DataType.INTEGER, 0, 0);
+        EqualExpression e1 = new EqualExpression(ep1, p1, c1);
+        ConstantExpression ca1 = new ConstantExpression(ep1, DataType.BOOLEAN, new Boolean("true"));
+        AndExpression a1 = new AndExpression(ep1, e1, ca1);
+        
+        LogicalExpressionPlan ep2 = new LogicalExpressionPlan();
+        ConstantExpression c2 = new ConstantExpression(ep2, DataType.INTEGER, new Integer(5));
+        ProjectExpression p2 = new ProjectExpression(ep2, DataType.INTEGER, 0, 0);
+        EqualExpression e2 = new EqualExpression(ep2, p2, c2);
+        ConstantExpression ca2 = new ConstantExpression(ep2, DataType.BOOLEAN, new Boolean("true"));
+        AndExpression a2 = new AndExpression(ep2, e2, ca2);
+        
+        assertTrue(ep1.isEqual(ep2));
+        assertTrue(c1.isEqual(c2));
+        assertTrue(p1.isEqual(p2));
+        assertTrue(e1.isEqual(e2));
+        assertTrue(ca1.isEqual(ca2));
+        assertTrue(a1.isEqual(a2));
+        
+        LogicalExpressionPlan ep3 = new LogicalExpressionPlan();
+        ConstantExpression c3 = new ConstantExpression(ep3, DataType.INTEGER, new Integer(3));
+        ProjectExpression p3 = new ProjectExpression(ep3, DataType.INTEGER, 0, 1);
+        EqualExpression e3 = new EqualExpression(ep3, p3, c3);
+        ConstantExpression ca3 = new ConstantExpression(ep3, DataType.CHARARRAY, "true");
+        AndExpression a3 = new AndExpression(ep3, e3, ca3);
+        
+        assertFalse(ep1.isEqual(ep3));
+        assertFalse(c1.isEqual(c3));
+        assertFalse(p1.isEqual(p3));
+        assertFalse(e1.isEqual(e3));
+        assertFalse(ca1.isEqual(ca3));
+        assertFalse(a1.isEqual(a3));
+        
+        LogicalExpressionPlan ep4 = new LogicalExpressionPlan();
+        ProjectExpression p4 = new ProjectExpression(ep4, DataType.INTEGER, 1, 0);
+        
+        assertFalse(ep1.isEqual(ep4));
+        assertFalse(p1.isEqual(p4));
+    }
+    
+    @Test
+    public void testRelationalEquality() throws IOException {
+        // Build a plan that is the logical plan for
+        // A = load 'bla' as (x);
+        // B = load 'morebla' as (y);
+        // C = join A on x, B on y;
+        // D = filter C by y > 0;
+        
+        // A = load
+        LogicalPlan lp = new LogicalPlan();
+        {
+            LogicalSchema aschema = new LogicalSchema();
+            aschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            LOLoad A = new LOLoad(new FileSpec("/abc",
+                new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp);
+            lp.add(A);
+        
+            // B = load
+            LogicalSchema bschema = new LogicalSchema();
+            bschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LOLoad B = new LOLoad(new FileSpec("/def",
+                new FuncSpec("PigStorage", "\t")), bschema, lp);
+            lp.add(B);
+        
+            // C = join
+            LogicalSchema cschema = new LogicalSchema();
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+            LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+            MultiMap<Integer, LogicalExpressionPlan> mm = 
+                new MultiMap<Integer, LogicalExpressionPlan>();
+            mm.put(0, aprojplan);
+            mm.put(1, bprojplan);
+            LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+            C.neverUseForRealSetSchema(cschema);
+            lp.add(new LogicalRelationalOperator[] {A, B}, C, null);
+            
+            // D = filter
+            LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+            ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+            ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+            new EqualExpression(filterPlan, fy, fc);
+            LOFilter D = new LOFilter(lp, filterPlan);
+            D.neverUseForRealSetSchema(cschema);
+            lp.add(C, D, (LogicalRelationalOperator)null);
+        }
+        
+        // Build a second similar plan to test equality
+        // A = load
+        LogicalPlan lp1 = new LogicalPlan();
+        {
+            LogicalSchema aschema = new LogicalSchema();
+            aschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            LOLoad A = new LOLoad(new FileSpec("/abc",
+                new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp1);
+            lp1.add(A);
+            
+            // B = load
+            LogicalSchema bschema = new LogicalSchema();
+            bschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LOLoad B = new LOLoad(new FileSpec("/def",
+                new FuncSpec("PigStorage", "\t")), bschema, lp1);
+            lp1.add(B);
+            
+            // C = join
+            LogicalSchema cschema = new LogicalSchema();
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+            LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+            MultiMap<Integer, LogicalExpressionPlan> mm = 
+                new MultiMap<Integer, LogicalExpressionPlan>();
+            mm.put(0, aprojplan);
+            mm.put(1, bprojplan);
+            LOJoin C = new LOJoin(lp1, mm, JOINTYPE.HASH, new boolean[] {true, true});
+            C.neverUseForRealSetSchema(cschema);
+            lp1.add(new LogicalRelationalOperator[] {A, B}, C, null);
+            
+            // D = filter
+            LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+            ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+            ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+            new EqualExpression(filterPlan, fy, fc);
+            LOFilter D = new LOFilter(lp1, filterPlan);
+            D.neverUseForRealSetSchema(cschema);
+            lp1.add(C, D, (LogicalRelationalOperator)null);
+        }
+        
+        assertTrue( lp.isEqual(lp1));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFuncSpecCtorArgs() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+        
+        LOLoad load2 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp);
+        lp.add(load2);
+        
+        assertFalse(load1.isEqual(load2));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentNumFuncSpecCstorArgs() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+        
+        LOLoad load3 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", "x")), aschema1, lp);
+        lp.add(load3);
+        
+        assertFalse(load1.isEqual(load3));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFunctionNames() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+        
+         // Different function names in FuncSpec
+        LOLoad load4 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foobar", new String[] {"x", "z"})), aschema1, lp);
+        lp.add(load4);
+        
+        assertFalse(load1.isEqual(load4));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFileName() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+    
+        // Different file name
+        LOLoad load5 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp);
+        lp.add(load5);
+        
+        assertFalse(load1.isEqual(load5));
+    }
+    
+    @Test
+    public void testRelationalEqualityDifferentSchema() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+        
+        // Different schema
+        LogicalSchema aschema2 = new LogicalSchema();
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.CHARARRAY));
+        
+        LOLoad load6 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema2, lp);
+        lp.add(load6);
+            
+        assertFalse(load1.isEqual(load6));
+    }
+    
+    @Test
+    public void testRelationalEqualityNullSchemas() {
+        LogicalPlan lp = new LogicalPlan();
+        // Test that two loads with no schema are still equal
+        LOLoad load7 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), null, lp);
+        lp.add(load7);
+        
+        LOLoad load8 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), null, lp);
+        lp.add(load8);
+        
+        assertTrue(load7.isEqual(load8));
+    }
+    
+    @Test
+    public void testRelationalEqualityOneNullOneNotNullSchema() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp);
+        lp.add(load1);
+        
+        // Test that one with schema and one without breaks equality
+        LOLoad load9 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), null, lp);
+        lp.add(load9);
+        
+        assertFalse(load1.isEqual(load9));
+    }
+        
+    @Test
+    public void testFilterDifferentPredicates() {
+        LogicalPlan lp = new LogicalPlan();
+            
+        LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+        ProjectExpression fy1 = new ProjectExpression(fp1, DataType.INTEGER, 0, 1);
+        ConstantExpression fc1 = new ConstantExpression(fp1, DataType.INTEGER,
+            new Integer(0));
+        new EqualExpression(fp1, fy1, fc1);
+        LOFilter D1 = new LOFilter(lp, fp1);
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        D1.neverUseForRealSetSchema(cschema);
+        lp.add(D1);
+        
+        LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+        ProjectExpression fy2 = new ProjectExpression(fp2, DataType.INTEGER, 0, 1);
+        ConstantExpression fc2 = new ConstantExpression(fp2, DataType.INTEGER,
+            new Integer(1));
+        new EqualExpression(fp2, fy2, fc2);
+        LOFilter D2 = new LOFilter(lp, fp2);
+        D2.neverUseForRealSetSchema(cschema);
+        lp.add(D2);
+        
+        assertFalse(D1.isEqual(D2));
+    }
+        
+    // No tests for LOStore because it tries to actually instantiate the store
+    // func, and I don't want to mess with that here.
+    
+    @Test
+    public void testJoinDifferentJoinTypes() throws IOException {
+       LogicalPlan lp = new LogicalPlan();
+       LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+        
+        // A = load
+        LogicalSchema jaschema2 = new LogicalSchema();
+        jaschema2.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A2 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema2, lp);
+        lp.add(A2);
+        
+        // B = load
+        LogicalSchema jbschema2 = new LogicalSchema();
+        jbschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B2 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema2, lp);
+        lp.add(B2);
+        
+        // C = join
+        LogicalSchema jcschema2 = new LogicalSchema();
+        jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan2 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan2, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan2 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan2, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm2 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm2.put(0, aprojplan2);
+        mm2.put(1, bprojplan2);
+        LOJoin C2 = new LOJoin(lp, mm2, JOINTYPE.SKEWED, new boolean[] {true, true});
+        C2.neverUseForRealSetSchema(jcschema2);
+        lp.add(new LogicalRelationalOperator[] {A2, B2}, C2, null);
+        
+        assertFalse(C1.isEqual(C2));
+    }
+    
+    @Test
+    public void testJoinDifferentInner() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+               LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+ 
+        // Test different inner status
+        // A = load
+        LogicalSchema jaschema3 = new LogicalSchema();
+        jaschema3.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A3 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema3, lp);
+        lp.add(A3);
+        
+        // B = load
+        LogicalSchema jbschema3 = new LogicalSchema();
+        jbschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B3 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema3, lp);
+        lp.add(B3);
+        
+        // C = join
+        LogicalSchema jcschema3 = new LogicalSchema();
+        jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan3 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan3, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan3 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan3, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm3 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm3.put(0, aprojplan3);
+        mm3.put(1, bprojplan3);
+        LOJoin C3 = new LOJoin(lp, mm3, JOINTYPE.HASH, new boolean[] {true, false});
+        C3.neverUseForRealSetSchema(jcschema3);
+        lp.add(new LogicalRelationalOperator[] {A3, B3}, C3, null);
+        
+        assertFalse(C1.isEqual(C3));
+    }
+ 
+    @Test
+    public void testJoinDifferentNumInputs() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+               LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(new LogicalRelationalOperator[] {A1, B1}, C1, null);
+ 
+        // A = load
+        LogicalSchema jaschema5 = new LogicalSchema();
+        jaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A5 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema5, lp);
+        lp.add(A5);
+        
+        // B = load
+        LogicalSchema jbschema5 = new LogicalSchema();
+        jbschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B5 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema5, lp);
+        lp.add(B5);
+        
+        // Beta = load
+        LogicalSchema jbetaschema5 = new LogicalSchema();
+        jbetaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad Beta5 = new LOLoad(new FileSpec("/ghi",
+            new FuncSpec("PigStorage", "\t")), jbetaschema5, lp);
+        lp.add(Beta5);
+        
+        // C = join
+        LogicalSchema jcschema5 = new LogicalSchema();
+        jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan5, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan5, DataType.INTEGER, 1, 0);
+        LogicalExpressionPlan betaprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(betaprojplan5, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm5 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm5.put(0, aprojplan5);
+        mm5.put(1, bprojplan5);
+        mm5.put(2, betaprojplan5);
+        LOJoin C5 = new LOJoin(lp, mm5, JOINTYPE.HASH, new boolean[] {true, true});
+        C5.neverUseForRealSetSchema(jcschema5);
+        lp.add(new LogicalRelationalOperator[] {A5, B5, Beta5}, C5, null);
+        
+        assertFalse(C1.isEqual(C5));
+    }
+        
+    @Test
+    public void testJoinDifferentJoinKeys() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+        
+        // Test different join keys
+        LogicalSchema jaschema6 = new LogicalSchema();
+        jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A6 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp);
+        lp.add(A6);
+        
+        // B = load
+        LogicalSchema jbschema6 = new LogicalSchema();
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B6 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema6, lp);
+        lp.add(B6);
+        
+        // C = join
+        LogicalSchema jcschema6 = new LogicalSchema();
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan6, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan6, DataType.INTEGER, 1, 0);
+        LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan6, DataType.INTEGER, 1, 1);
+        MultiMap<Integer, LogicalExpressionPlan> mm6 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm6.put(0, aprojplan6);
+        mm6.put(1, bprojplan6);
+        mm6.put(1, b2projplan6);
+        LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+        C6.neverUseForRealSetSchema(jcschema6);
+        lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null);
+        
+        LogicalSchema jaschema7 = new LogicalSchema();
+        jaschema7.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A7 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema7, lp);
+        lp.add(A7);
+        
+        // B = load
+        LogicalSchema jbschema7 = new LogicalSchema();
+        jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B7 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema7, lp);
+        lp.add(B7);
+        
+        // C = join
+        LogicalSchema jcschema7 = new LogicalSchema();
+        jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan7, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan7, DataType.INTEGER, 1, 1);
+        LogicalExpressionPlan b2projplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan7, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm7 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm7.put(0, aprojplan7);
+        mm7.put(1, bprojplan7);
+        mm7.put(1, b2projplan7);
+        LOJoin C7 = new LOJoin(lp, mm7, JOINTYPE.HASH, new boolean[] {true, true});
+        C7.neverUseForRealSetSchema(jcschema7);
+        lp.add(new LogicalRelationalOperator[] {A7, B7}, C7, null);
+        
+        assertFalse(C6.isEqual(C7));
+    }
+    
+    @Test
+    public void testJoinDifferentNumJoinKeys() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+        
+        // Test different join keys
+        LogicalSchema jaschema6 = new LogicalSchema();
+        jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A6 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp);
+        lp.add(A6);
+        
+        // B = load
+        LogicalSchema jbschema6 = new LogicalSchema();
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B6 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema6, lp);
+        lp.add(B6);
+        
+        // C = join
+        LogicalSchema jcschema6 = new LogicalSchema();
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan6, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan6, DataType.INTEGER, 1, 0);
+        LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan6, DataType.INTEGER, 1, 1);
+        MultiMap<Integer, LogicalExpressionPlan> mm6 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm6.put(0, aprojplan6);
+        mm6.put(1, bprojplan6);
+        mm6.put(1, b2projplan6);
+        LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+        C6.neverUseForRealSetSchema(jcschema6);
+        lp.add(new LogicalRelationalOperator[] {A6, B6}, C6, null);
+        
+        // Test different different number of join keys
+        LogicalSchema jaschema8 = new LogicalSchema();
+        jaschema8.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A8 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema8, lp);
+        lp.add(A8);
+        
+        // B = load
+        LogicalSchema jbschema8 = new LogicalSchema();
+        jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B8 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema8, lp);
+        lp.add(B8);
+        
+        // C = join
+        LogicalSchema jcschema8 = new LogicalSchema();
+        jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan8 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan8, DataType.INTEGER, 0, 0);
+        LogicalExpressionPlan bprojplan8 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan8, DataType.INTEGER, 1, 0);
+        MultiMap<Integer, LogicalExpressionPlan> mm8 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm8.put(0, aprojplan8);
+        mm8.put(1, bprojplan8);
+        LOJoin C8 = new LOJoin(lp, mm8, JOINTYPE.HASH, new boolean[] {true, true});
+        C8.neverUseForRealSetSchema(jcschema8);
+        lp.add(new LogicalRelationalOperator[] {A8, B8}, C8, null);
+        
+        assertFalse(C6.isEqual(C8));
+    }
+    
+    @Test
+    public void testRelationalSameOpDifferentPreds() throws IOException {
+        LogicalPlan lp1 = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad A1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("/fooload", new String[] {"x", "y"})), aschema1, lp1);
+        lp1.add(A1);
+        
+        LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+        ProjectExpression fy1 = new ProjectExpression(fp1, DataType.INTEGER, 0, 0);
+        ConstantExpression fc1 = new ConstantExpression(fp1, DataType.INTEGER,
+            new Integer(0));
+        new EqualExpression(fp1, fy1, fc1);
+        LOFilter D1 = new LOFilter(lp1, fp1);
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        D1.neverUseForRealSetSchema(cschema);
+        lp1.add(A1, D1, (LogicalRelationalOperator)null);
+        
+        LogicalPlan lp2 = new LogicalPlan();
+        LOLoad A2 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("/foo", new String[] {"x", "z"})), null, lp2);
+        lp2.add(A2);
+        
+        LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+        ProjectExpression fy2 = new ProjectExpression(fp2, DataType.INTEGER, 0, 0);
+        ConstantExpression fc2 = new ConstantExpression(fp2, DataType.INTEGER,
+            new Integer(0));
+        new EqualExpression(fp2, fy2, fc2);
+        LOFilter D2 = new LOFilter(lp2, fp2);
+        D2.neverUseForRealSetSchema(cschema);
+        lp2.add(A2, D2, (LogicalRelationalOperator)null);
+        
+        assertFalse(D1.isEqual(D2));
+    }
+    
  
 }

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java?rev=909165&r1=909164&r2=909165&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java Thu Feb 11 22:12:36 2010
@@ -56,6 +56,11 @@
             super();
         }
 
+        @Override
+        public boolean isEqual(OperatorPlan other) {
+            return false;
+        }
+
     }
     
     private static class OP extends Operator {
@@ -66,6 +71,11 @@
         public void accept(PlanVisitor v) {
             
         }
+
+        @Override
+        public boolean isEqual(Operator operator) {
+            return false;
+        }
     }
     
     private static class OP_Load extends OP {
@@ -100,6 +110,7 @@
 
     
     OperatorPlan plan = null;
+    Operator join;
     
     public void setUp() {
         plan = new SillyPlan();
@@ -134,7 +145,47 @@
         plan.connect(t1, f3);
         plan.connect(t1, f4);
         plan.connect(f3, s1);
-        plan.connect(f4, s2);
+        plan.connect(f4, s2); 
+        
+        join = j1;
+    }
+    
+    
+    public void testMultiNode() throws Exception {    
+        //         load --|-join - filter - filter - split |- filter - store
+        //         load --|      
+        // load -- filter-|
+        Operator l3 = new OP_Load("p3", plan);
+        Operator f5 = new OP_Filter("f5", plan);
+        plan.add(l3);
+        plan.add(f5);
+        plan.connect(l3, f5);
+            
+         plan.connect(f5, join);
+       
+        
+         OperatorPlan pattern = new SillyPlan();
+         Operator op1 = new OP_Load("mmm1", pattern);
+         Operator op2 = new OP_Filter("mmm2", pattern);
+         Operator op3 = new OP_Join("mmm3", pattern);
+         pattern.add(op1);
+         pattern.add(op2);
+         pattern.add(op3);
+         pattern.connect(op1, op3);
+         pattern.connect(op2, op3);
+         
+         Rule r = new SillyRule("basic", pattern);
+         List<OperatorPlan> l = r.match(plan);
+         assertEquals(1, l.size());
+         OperatorPlan match = l.get(0);
+         assertEquals(match.size(), 3);
+         assertEquals(match.getSinks().size(), 1);
+         assertEquals(match.getSinks().get(0), join);
+         
+         assertEquals(match.getSources().size(), 2);
+         assertTrue(match.getSources().get(0).getClass().equals(OP_Load.class) || match.getSources().get(0).equals(f5) );
+         assertTrue(match.getSources().get(1).getClass().equals(OP_Load.class) || match.getSources().get(1).equals(f5) );
+         assertNotSame(match.getSources().get(0), match.getSources().get(1));
     }
     
     public void testSingleNodeMatch() {
@@ -146,11 +197,11 @@
         List<OperatorPlan> l = r.match(plan);
         assertEquals(l.size(), 2);
         
-        Operator m1 = l.get(0).getRoots().get(0);
+        Operator m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
         assertEquals(l.get(0).size(), 1);
         
-        Operator m2 = l.get(1).getRoots().get(0);
+        Operator m2 = l.get(1).getSources().get(0);
         assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));
         assertEquals(l.get(1).size(), 1);
         assertNotSame(m1.getName(), m2.getName());
@@ -162,12 +213,12 @@
         l = r.match(plan);
         assertEquals(l.size(), 4);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
                 || m1.getName().equals("f3") || m1.getName().equals("f4"));
         assertEquals(l.get(0).size(), 1);
         
-        m2 = l.get(1).getRoots().get(0);
+        m2 = l.get(1).getSources().get(0);
         assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
                 || m1.getName().equals("f3") || m1.getName().equals("f4"));
         assertEquals(l.get(1).size(), 1);
@@ -180,11 +231,11 @@
         l = r.match(plan);
         assertEquals(l.size(), 2);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("s1") || m1.getName().equals("s2"));
         assertEquals(l.get(0).size(), 1);
         
-        m2 = l.get(1).getRoots().get(0);
+        m2 = l.get(1).getSources().get(0);
         assertTrue(m2.getName().equals("s1") || m2.getName().equals("s2"));
         assertEquals(l.get(1).size(), 1);
         assertNotSame(m1.getName(), m2.getName());
@@ -196,7 +247,7 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("t1"));
         assertEquals(l.get(0).size(), 1);
         
@@ -207,7 +258,7 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("j1"));
         assertEquals(l.get(0).size(), 1);
       
@@ -223,13 +274,13 @@
         List<OperatorPlan> l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        assertEquals(l.get(0).getRoots().size(), 2);
-        assertEquals(l.get(0).getLeaves().size(), 2);
+        assertEquals(l.get(0).getSources().size(), 2);
+        assertEquals(l.get(0).getSinks().size(), 2);
         assertEquals(l.get(0).size(), 2);
         
-        Operator m1 = l.get(0).getRoots().get(0);
+        Operator m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
-        Operator m2 = l.get(0).getRoots().get(1);
+        Operator m2 = l.get(0).getSources().get(1);
         assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));       
         assertNotSame(m1.getName(), m2.getName());
        
@@ -246,13 +297,13 @@
         l = r.match(plan);
         assertEquals(l.size(), 1);
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 1);
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);
         assertEquals(l.get(0).size(), 2);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("j1"));
-        m2 = l.get(0).getLeaves().get(0);
+        m2 = l.get(0).getSinks().get(0);
         assertTrue(m2.getName().equals("f1"));       
        
   
@@ -268,8 +319,8 @@
         l = r.match(plan);
         assertEquals(2, l.size());
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 1);                     
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);                     
         
         // search for 2 loads, then join
         pattern = new SillyPlan();
@@ -301,15 +352,15 @@
         l = r.match(plan);
         assertEquals(1, l.size());
         
-        assertEquals(l.get(0).getRoots().size(), 1);
-        assertEquals(l.get(0).getLeaves().size(), 2);
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 2);
         assertEquals(l.get(0).size(), 3);
         
-        m1 = l.get(0).getRoots().get(0);
+        m1 = l.get(0).getSources().get(0);
         assertTrue(m1.getName().equals("t1"));
-        m2 = l.get(0).getLeaves().get(0);
+        m2 = l.get(0).getSinks().get(0);
         assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
-        m2 = l.get(0).getLeaves().get(1);
+        m2 = l.get(0).getSinks().get(1);
         assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
     }
    

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,373 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.CastExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LOForEach;
+import org.apache.pig.experimental.logical.relational.LOGenerate;
+import org.apache.pig.experimental.logical.relational.LOInnerLoad;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestLogicalPlanMigrationVisitor extends TestCase {
+
+    public void testSimplePlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = filter a by $0==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(3, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 1);
+        
+        // check load
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        EqualExpression eq = (EqualExpression)exp.getSources().get(0);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+    }
+    
+    public void testPlanWithCast() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = filter a by (int)id==10;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(3, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 1);
+        
+        // check load
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSources().get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);
+        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        EqualExpression eq = (EqualExpression)exp.getSources().get(0);
+        assertEquals(eq.getLhs().getClass(), CastExpression.class);
+                
+        assertEquals(eq.getLhs().getClass(), CastExpression.class);
+        LogicalExpression ep = (LogicalExpression)exp.getSuccessors(eq.getLhs()).get(0);
+        assertEquals(ep.getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)ep).getColNum(), 0);
+        assertEquals(((ProjectExpression)ep).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class);
+    }
+    
+    public void testJoinPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+        lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+        lpt.buildPlan("c = join a by id, b by c;");
+        lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
+      
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        assertEquals(5, newPlan.size());
+        assertEquals(newPlan.getSources().size(), 2);
+       
+        // check load and join
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSuccessors(newPlan.getSources().get(0)).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOJoin.class);
+        assertEquals(((LOJoin)op).getJoinType(), LOJoin.JOINTYPE.HASH);
+        
+        LogicalRelationalOperator l1 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(0);
+        assertEquals(l1.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        assertEquals(l1.getAlias(), "a");
+        
+        LogicalRelationalOperator l2 = (LogicalRelationalOperator)newPlan.getPredecessors(op).get(1);
+        assertEquals(l2.getClass(), org.apache.pig.experimental.logical.relational.LOLoad.class);
+        assertEquals(l2.getAlias(), "b");
+
+        // check join input plans
+        LogicalExpressionPlan p1 = ((LOJoin)op).getJoinPlan(0).iterator().next();
+        assertEquals(p1.size(), 1);
+        
+        ProjectExpression prj = (ProjectExpression)p1.getSources().get(0);
+       
+        assertEquals(prj.getInputNum(), 0);
+        assertEquals(prj.getColNum(), 0);
+        
+        LogicalExpressionPlan p2 = ((LOJoin)op).getJoinPlan(1).iterator().next();
+        assertEquals(p2.size(), 1);
+        
+        prj = (ProjectExpression)p2.getSources().get(0);
+     
+        assertEquals(prj.getInputNum(), 1);
+        assertEquals(prj.getColNum(), 1);
+        
+        // check filter
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOFilter.class);        
+        LogicalExpressionPlan exp = ((org.apache.pig.experimental.logical.relational.LOFilter)op).getFilterPlan();
+        
+        AndExpression ae = (AndExpression)exp.getSources().get(0);
+        
+        EqualExpression eq = (EqualExpression)exp.getSuccessors(ae).get(0);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 0);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        eq = (EqualExpression)exp.getSuccessors(ae).get(1);
+        assertEquals(eq.getLhs().getClass(), ProjectExpression.class);
+        assertEquals(((ProjectExpression)eq.getLhs()).getColNum(), 3);
+        assertEquals(((ProjectExpression)eq.getLhs()).getInputNum(), 0);
+        
+        assertEquals(eq.getRhs().getClass(), ConstantExpression.class);
+        
+        // check store
+        op = (LogicalRelationalOperator)newPlan.getSuccessors(op).get(0);
+        assertEquals(op.getClass(), org.apache.pig.experimental.logical.relational.LOStore.class); 
+    }
+    
+    public void testForeachPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d);");
+        lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");        
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan expected = 
+            new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY));
+        LOLoad load = new LOLoad(new FileSpec("file:/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+        expected.add(load);
+        
+        LOForEach foreach = new LOForEach(expected);
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach, 0);
+        innerPlan.add(l1);
+        LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach, 1);
+        
+        List<LogicalExpressionPlan> eps = new ArrayList<LogicalExpressionPlan>();
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        p1.add(new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0));
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();
+        p2.add(new ProjectExpression(p2, DataType.BYTEARRAY, 1, 0));
+        eps.add(p1);
+        eps.add(p2);
+        
+        LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
+        innerPlan.add(gen);
+        innerPlan.connect(l1, gen);
+        innerPlan.connect(l2, gen);
+        
+        foreach.setInnerPlan(innerPlan);    	
+        expected.add(foreach);
+        
+        LOStore s = new LOStore(expected, new FileSpec("file:/test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage")));
+      
+        expected.add(s);
+        
+        expected.connect(load, foreach);
+        expected.connect(foreach, s);
+        
+        try {
+            UidStamper stamper = new UidStamper(expected);
+            stamper.visit();         
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+        
+        assertTrue(expected.isEqual(newPlan));
+        
+        LogicalSchema schema = foreach.getSchema();
+        aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", null, DataType.BYTEARRAY));
+        assertTrue(schema.isEqual(aschema));
+    }
+
+    public void testForeachSchema() throws Exception {
+        // test flatten
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:tuple(v, s));");
+        LogicalPlan plan = lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");  
+                
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        LogicalRelationalOperator op = (LogicalRelationalOperator)newPlan.getSinks().get(0);
+        
+        LogicalSchema s2 = new LogicalSchema();
+        s2.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        s2.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        s2.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        assertTrue(s2.isEqual(op.getSchema()));
+        
+        // test no flatten
+        lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});");
+        plan = lpt.buildPlan("b = foreach a generate id, d;");  
+                
+        newPlan = migratePlan(plan);
+        op = (LogicalRelationalOperator)newPlan.getSinks().get(0);
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        LogicalSchema aschema2 = new LogicalSchema();
+        LogicalSchema aschema3 = new LogicalSchema();
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG));  
+        
+        assertTrue(aschema.isEqual(op.getSchema()));
+    }
+    
+    public void testForeachPlan2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load '/test/d.txt' as (id, d:bag{t:(v, s)});");
+        lpt.buildPlan("b = foreach a generate id, FLATTEN(d);");        
+        LogicalPlan plan = lpt.buildPlan("store b into '/test/empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan expected = 
+            new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        
+        LogicalSchema aschema = new LogicalSchema();    	
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        LogicalSchema aschema2 = new LogicalSchema();
+        LogicalSchema aschema3 = new LogicalSchema();
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema3.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema("t", aschema3, DataType.TUPLE));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("d", aschema2, DataType.BAG));        
+        
+        LOLoad load = new LOLoad(new FileSpec("file:/test/d.txt", new FuncSpec("org.apache.pig.builtin.PigStorage")), aschema, expected);
+        expected.add(load);         
+        
+        LOForEach foreach2 = new LOForEach(expected);
+        org.apache.pig.experimental.logical.relational.LogicalPlan innerPlan = new org.apache.pig.experimental.logical.relational.LogicalPlan();
+        LOInnerLoad l1 = new LOInnerLoad(innerPlan, foreach2, 0);
+        innerPlan.add(l1);
+        LOInnerLoad l2 = new LOInnerLoad(innerPlan, foreach2, 1);
+        
+        List<LogicalExpressionPlan>  eps = new ArrayList<LogicalExpressionPlan>();
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        new ProjectExpression(p1, DataType.BYTEARRAY, 0, 0);
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();        
+        new ProjectExpression(p2, DataType.BAG, 1, 0);
+        eps.add(p1);
+        eps.add(p2);
+        
+        LOGenerate gen = new LOGenerate(innerPlan, eps, new boolean[] {false, true});
+        innerPlan.add(gen);
+        innerPlan.connect(l1, gen);
+        innerPlan.connect(l2, gen);
+        
+        foreach2.setInnerPlan(innerPlan);    	
+        expected.add(foreach2); 
+                
+        LOStore s = new LOStore(expected, new FileSpec("file:/test/empty", new FuncSpec("org.apache.pig.builtin.PigStorage")));
+      
+        expected.add(s);
+        
+        expected.connect(load, foreach2);
+    
+        expected.connect(foreach2, s);
+        try {
+            UidStamper stamper = new UidStamper(expected);
+            stamper.visit();         
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+        
+        assertTrue(expected.isEqual(newPlan));
+        
+        LogicalSchema schema = foreach2.getSchema();
+        aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        aschema.addField(new LogicalSchema.LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        assertTrue(schema.isEqual(aschema));
+    }
+    
+    private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);    	
+        visitor.visit();
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        try {
+            UidStamper stamper = new UidStamper(newPlan);
+            stamper.visit();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }    
+}