You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC

svn commit: r982345 [13/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/lo...

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,1536 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.BaseOperatorPlan;
+import org.apache.pig.newplan.DependencyOrderWalker;
+import org.apache.pig.newplan.DepthFirstWalker;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanEdge;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.PlanWalker;
+import org.apache.pig.newplan.ReverseDependencyOrderWalker;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOJoin;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.newplan.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+public class TestNewPlanOperatorPlan extends TestCase {
+    
+    private static class SillyPlan extends BaseOperatorPlan {
+        
+        SillyPlan() {
+            super();
+        }
+    }
+    
+    private static class SillyOperator extends Operator {
+        private String name;
+        
+        SillyOperator(String n, SillyPlan p) {
+            super(n, p);
+            name = n;
+        }
+
+        @Override
+        public void accept(PlanVisitor v) {
+            if (v instanceof SillyVisitor) {
+                ((SillyVisitor)v).visitSillyOperator(this);
+            }
+        }
+
+        @Override
+        public boolean isEqual(Operator operator) {
+            return ( name.compareTo(operator.getName()) == 0 );
+        }
+    }
+    
+    private static class SillyVisitor extends PlanVisitor {
+        
+        StringBuffer buf;
+
+        protected SillyVisitor(OperatorPlan plan, PlanWalker walker) {
+            super(plan, walker);
+            buf = new StringBuffer();
+        }
+        
+        public void visitSillyOperator(SillyOperator so) {
+            buf.append(so.getName());
+        }
+        
+        public String getVisitPattern() {
+            return buf.toString();
+        }
+        
+    }
+    
+    // Tests for PlanEdge
+    
+    @Test
+    public void testPlanEdgeInsert() {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        PlanEdge edges = new PlanEdge();
+        
+        // Test initial entry
+        edges.put(fred, joe, 0);
+        Collection<Operator> c = edges.get(fred);
+        assertEquals(1, c.size());
+        Operator[] a = new Operator[1];
+        Operator[] b = c.toArray(a);
+        assertEquals(joe, b[0]);
+        
+        // Test entry with no position
+        SillyOperator bob = new SillyOperator("bob", plan);
+        edges.put(fred, bob);
+        c = edges.get(fred);
+        assertEquals(2, c.size());
+        a = new Operator[2];
+        b = c.toArray(a);
+        assertEquals(joe, b[0]);
+        assertEquals(bob, b[1]);
+        
+        // Test entry with position
+        SillyOperator jill = new SillyOperator("jill", plan);
+        edges.put(fred, jill, 1);
+        c = edges.get(fred);
+        assertEquals(3, c.size());
+        a = new Operator[3];
+        b = c.toArray(a);
+        assertEquals(joe, b[0]);
+        assertEquals(jill, b[1]);
+        assertEquals(bob, b[2]);
+    }
+    
+    // Test that entry with invalid position cannot be made.
+    @Test
+    public void testPlanEdgeInsertFirstIndexBad() {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        PlanEdge edges = new PlanEdge();
+        boolean caught = false;
+        try {
+            edges.put(fred, joe, 1);
+        } catch (IndexOutOfBoundsException e) {
+            caught = true;
+        }
+        assertTrue(caught);
+        
+        caught = false;
+        edges.put(fred, joe);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        try {
+            edges.put(fred, bob, 2);
+        } catch (IndexOutOfBoundsException e) {
+            caught = true;
+        }
+        assertTrue(caught);
+    }
+    
+    // Test OperatorPlan
+    @Test
+    public void testOperatorPlan() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jim = new SillyOperator("jim", plan);
+        SillyOperator sam = new SillyOperator("sam", plan);
+        
+        // Test that roots and leaves are empty when there are no operators in
+        // plan.
+        List<Operator> list = plan.getSources();
+        assertEquals(0, list.size());
+        list = plan.getSinks();
+        assertEquals(0, list.size());
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jim);
+        plan.add(sam);
+        
+        // Test that when not connected all nodes are roots and leaves.
+        list = plan.getSources();
+        assertEquals(5, list.size());
+        list = plan.getSinks();
+        assertEquals(5, list.size());
+        
+        // Connect them up
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jim);
+        plan.connect(bob, sam);
+        
+        // Check that the roots and leaves came out right
+        list = plan.getSources();
+        assertEquals(2, list.size());
+        for (Operator op : list) {
+            assertTrue(fred.isEqual(op) || joe.isEqual(op));
+        }
+        list = plan.getSinks();
+        assertEquals(2, list.size());
+        for (Operator op : list) {
+            assertTrue(jim.isEqual(op) || sam.isEqual(op));
+        }
+        
+        // Check each of their successors and predecessors
+        list = plan.getSuccessors(fred);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getSuccessors(joe);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(jim);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(sam);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(bob);
+        assertEquals(2, list.size());
+        assertEquals(fred, list.get(0));
+        assertEquals(joe, list.get(1));
+        
+        list = plan.getSuccessors(bob);
+        assertEquals(2, list.size());
+        assertEquals(jim, list.get(0));
+        assertEquals(sam, list.get(1));
+        
+        // Now try swapping two, and check that all comes out as planned
+        Pair<Integer, Integer> p1 = plan.disconnect(bob, jim);
+        Pair<Integer, Integer> p2 = plan.disconnect(fred, bob);
+        
+        plan.connect(bob, p1.first, fred, p1.second);
+        plan.connect(jim, p2.first, bob, p2.second);
+        
+         // Check that the roots and leaves came out right
+        list = plan.getSources();
+        assertEquals(2, list.size());
+        for (Operator op : list) {
+            assertTrue(jim.isEqual(op) || joe.isEqual(op));
+        }
+        list = plan.getSinks();
+        assertEquals(2, list.size());
+        for (Operator op : list) {
+            assertTrue(fred.isEqual(op) || sam.isEqual(op));
+        }
+        
+        // Check each of their successors and predecessors
+        list = plan.getSuccessors(jim);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getSuccessors(joe);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(fred);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(sam);
+        assertEquals(1, list.size());
+        assertEquals(bob, list.get(0));
+        
+        list = plan.getPredecessors(bob);
+        assertEquals(2, list.size());
+        assertEquals(jim, list.get(0));
+        assertEquals(joe, list.get(1));
+        
+        list = plan.getSuccessors(bob);
+        assertEquals(2, list.size());
+        assertEquals(fred, list.get(0));
+        assertEquals(sam, list.get(1));
+        
+    }
+    
+    @Test
+    public void testDisconnectAndRemove() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        
+        plan.connect(fred, joe);
+        
+        plan.remove(bob);
+        plan.disconnect(fred, joe);
+        
+        List<Operator> list = plan.getSources();
+        assertEquals(2, list.size());
+        list = plan.getSinks();
+        assertEquals(2, list.size());
+        
+        plan.remove(fred);
+        plan.remove(joe);
+        
+        assertEquals(0, plan.size());
+        
+        list = plan.getSources();
+        assertEquals(0, list.size());
+        list = plan.getSinks();
+        assertEquals(0, list.size());
+    }
+    
+    // Test bad remove
+    @Test
+    public void testRemoveNegative() {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        
+        plan.connect(fred, joe);
+        
+        boolean caught = false;
+        try {
+            plan.remove(fred);
+        } catch (IOException e) {
+            caught = true;
+        }
+        assertTrue(caught);
+        
+        caught = false;
+        try {
+            plan.remove(joe);
+        } catch (IOException e) {
+            caught = true;
+        }
+        assertTrue(caught);
+        
+    }
+    
+    @Test
+    public void testDisconnectNegative() {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        
+        boolean caught = false;
+        try {
+            plan.disconnect(fred, joe);
+        } catch (IOException e) {
+            caught = true;
+        }
+        assertTrue(caught);
+        
+    }
+    
+    // Tests for DependencyOrderWalker
+    
+    @Test
+    public void testDependencyOrderWalkerLinear() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        
+        plan.connect(fred, joe);
+        plan.connect(joe, bob);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        assertEquals("fredjoebob", s);
+    }
+
+    @Test
+    public void testDependencyOrderWalkerTree() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jill);
+        plan.connect(jane, jill);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        if (!s.equals("fredjoebobjanejill") &&
+                !s.equals("joefredbobjanejill") &&
+                !s.equals("janefredjoebobjill") &&
+                !s.equals("janejoefredbobjill")) {
+            System.out.println("Invalid order " + s);
+            fail();
+        }
+    }
+
+    @Test
+    public void testDependencyOrderWalkerGraph() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jill);
+        plan.connect(bob, jane);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        if (!s.equals("fredjoebobjanejill") &&
+                !s.equals("joefredbobjanejill") &&
+                !s.equals("fredjoebobjilljane") &&
+                !s.equals("joefredbobjilljane")) {
+            System.out.println("Invalid order " + s);
+            fail();
+        }
+    }
+
+    // Tests for DepthFirstWalker
+    
+    @Test
+    public void testDepthFirstWalkerLinear() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        
+        plan.connect(fred, joe);
+        plan.connect(joe, bob);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DepthFirstWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        assertEquals("fredjoebob", s);
+    }
+
+    @Test
+    public void testDepthFirstWalkerTree() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(fred, joe);
+        plan.connect(joe, jill);
+        plan.connect(joe, jane);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DepthFirstWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        assertEquals("fredbobjoejilljane", s);
+    }
+
+    @Test
+    public void testDepthFirstWalkerGraph() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jill);
+        plan.connect(bob, jane);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new DepthFirstWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        if (!s.equals("fredbobjilljanejoe") &&
+                !s.equals("joebobjilljanefred")) {
+            System.out.println("Invalid order " + s);
+            fail();
+        }
+    }
+
+    // Tests for ReverseDependencyOrderWalker
+    
+    @Test
+    public void testReverseDependencyOrderWalkerLinear() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        
+        plan.connect(fred, joe);
+        plan.connect(joe, bob);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        assertEquals("bobjoefred", s);
+    }
+
+    @Test
+    public void testReverseDependencyOrderWalkerTree() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jill);
+        plan.connect(jane, jill);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        if (!s.equals("jilljanebobjoefred") &&
+                !s.equals("jilljanebobfredjoe") &&
+                !s.equals("jillbobjoefredjane") &&
+                !s.equals("jillbobjoejanefred") &&
+                !s.equals("jillbobfredjoejane") &&
+                !s.equals("jillbobfredjanejoe") &&
+                !s.equals("jillbobjanejoefred") &&
+                !s.equals("jillbobjanefredjoe")) {
+            System.out.println("Invalid order " + s);
+            fail();
+        }
+    }
+
+    @Test
+    public void testReverseDependencyOrderWalkerGraph() throws IOException {
+        SillyPlan plan = new SillyPlan();
+        SillyOperator fred = new SillyOperator("fred", plan);
+        SillyOperator joe = new SillyOperator("joe", plan);
+        SillyOperator bob = new SillyOperator("bob", plan);
+        SillyOperator jill = new SillyOperator("jill", plan);
+        SillyOperator jane = new SillyOperator("jane", plan);
+        
+        plan.add(fred);
+        plan.add(joe);
+        plan.add(bob);
+        plan.add(jill);
+        plan.add(jane);
+        
+        plan.connect(fred, bob);
+        plan.connect(joe, bob);
+        plan.connect(bob, jill);
+        plan.connect(bob, jane);
+        
+        SillyVisitor v =
+            new SillyVisitor(plan, new ReverseDependencyOrderWalker(plan));
+        
+        v.visit();
+        
+        String s = v.getVisitPattern();
+        
+        if (!s.equals("jilljanebobjoefred") &&
+                !s.equals("jilljanebobfredjoe") &&
+                !s.equals("janejillbobjoefred") &&
+                !s.equals("janejillbobfredjoe")) {
+            System.out.println("Invalid order " + s);
+            fail();
+        }
+    }
+    
+    private static class TestLogicalVisitor extends LogicalRelationalNodesVisitor {
+        
+        StringBuffer bf = new StringBuffer();
+
+        protected TestLogicalVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+        
+        @Override
+        public void visit(LOLoad load) {
+            bf.append("load ");
+        }
+        
+        String getVisitPlan() {
+            return bf.toString();
+        }
+        
+    }
+    
+    @Test
+    public void testLogicalPlanVisitor() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+        LOLoad load = new LOLoad(null, null, lp, null);
+        /*lp.add((LogicalRelationalOperator)null, load,
+            (LogicalRelationalOperator)null);*/
+        lp.add(load);
+        
+        TestLogicalVisitor v = new TestLogicalVisitor(lp);
+        v.visit();
+        
+        assertEquals("load ", v.getVisitPlan());
+    }
+    
+    @Test
+    public void testBinaryOperatorOrder() throws IOException {
+        LogicalExpressionPlan ep = new LogicalExpressionPlan();
+        ConstantExpression c = new ConstantExpression(ep, new Integer(5), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        ProjectExpression p = new ProjectExpression(ep, 0, 0, null);
+        EqualExpression e = new EqualExpression(ep, p, c);
+        assertEquals(p, e.getLhs());
+        assertEquals(c, e.getRhs());
+        
+    }
+    
+    private static class TestExpressionVisitor extends LogicalExpressionVisitor {
+        
+        StringBuffer bf = new StringBuffer();
+
+        protected TestExpressionVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+        
+        @Override
+        public void visit(AndExpression andExpr) {
+            bf.append("and ");
+        }
+        
+        @Override
+        public void visit(EqualExpression equal) {
+            bf.append("equal ");
+        }
+        
+        @Override
+        public void visit(ProjectExpression project) {
+            bf.append("project ");
+        }
+        
+        @Override
+        public void visit(ConstantExpression constant) {
+            bf.append("constant ");
+        }
+        
+        String getVisitPlan() {
+            return bf.toString();
+        }
+        
+    }
+    
+    @Test
+    public void testExpressionPlanVisitor() throws IOException {
+        LogicalExpressionPlan ep = new LogicalExpressionPlan();
+        ConstantExpression c = new ConstantExpression(ep, new Integer(5), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        ProjectExpression p = new ProjectExpression(ep, 0, 0, null);
+        EqualExpression e = new EqualExpression(ep, p, c);
+        ConstantExpression c2 = new ConstantExpression(ep, new Boolean("true"), new LogicalFieldSchema(null, null, DataType.BOOLEAN));
+        new AndExpression(ep, e, c2);
+        
+        TestExpressionVisitor v = new TestExpressionVisitor(ep);
+        v.visit();
+        assertEquals("and equal project constant constant ", v.getVisitPlan());
+    }
+    
+    @Test
+    public void testExpressionEquality() {
+        LogicalExpressionPlan ep1 = new LogicalExpressionPlan();
+        ConstantExpression c1 = new ConstantExpression(ep1, new Integer(5), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        ProjectExpression p1 = new ProjectExpression(ep1, 0, 0, null);
+        EqualExpression e1 = new EqualExpression(ep1, p1, c1);
+        ConstantExpression ca1 = new ConstantExpression(ep1, new Boolean("true"), new LogicalFieldSchema(null, null, DataType.BOOLEAN));
+        AndExpression a1 = new AndExpression(ep1, e1, ca1);
+        
+        LogicalExpressionPlan ep2 = new LogicalExpressionPlan();
+        ConstantExpression c2 = new ConstantExpression(ep2, new Integer(5), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        ProjectExpression p2 = new ProjectExpression(ep2, 0, 0, null);
+        EqualExpression e2 = new EqualExpression(ep2, p2, c2);
+        ConstantExpression ca2 = new ConstantExpression(ep2, new Boolean("true"), new LogicalFieldSchema(null, null, DataType.BOOLEAN));
+        AndExpression a2 = new AndExpression(ep2, e2, ca2);
+        
+        assertTrue(ep1.isEqual(ep2));
+        assertTrue(c1.isEqual(c2));
+        assertTrue(p1.isEqual(p2));
+        assertTrue(e1.isEqual(e2));
+        assertTrue(ca1.isEqual(ca2));
+        assertTrue(a1.isEqual(a2));
+        
+        LogicalExpressionPlan ep3 = new LogicalExpressionPlan();
+        ConstantExpression c3 = new ConstantExpression(ep3, new Integer(3), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        ProjectExpression p3 = new ProjectExpression(ep3, 0, 1, null);
+        EqualExpression e3 = new EqualExpression(ep3, p3, c3);
+        ConstantExpression ca3 = new ConstantExpression(ep3, "true", new LogicalFieldSchema(null, null, DataType.CHARARRAY));
+        AndExpression a3 = new AndExpression(ep3, e3, ca3);
+        
+        assertFalse(ep1.isEqual(ep3));
+        assertFalse(c1.isEqual(c3));
+        assertFalse(p1.isEqual(p3));
+        assertFalse(e1.isEqual(e3));
+        assertFalse(ca1.isEqual(ca3));
+        assertFalse(a1.isEqual(a3));
+        
+        LogicalExpressionPlan ep4 = new LogicalExpressionPlan();
+        ProjectExpression p4 = new ProjectExpression(ep4, 1, 0, null);
+        
+        assertFalse(ep1.isEqual(ep4));
+        assertFalse(p1.isEqual(p4));
+    }
+    
+    @Test
+    public void testRelationalEquality() throws IOException {
+        // Build a plan that is the logical plan for
+        // A = load 'bla' as (x);
+        // B = load 'morebla' as (y);
+        // C = join A on x, B on y;
+        // D = filter C by y > 0;
+        
+        // A = load
+        LogicalPlan lp = new LogicalPlan();
+        {
+            LogicalSchema aschema = new LogicalSchema();
+            aschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            LOLoad A = new LOLoad(new FileSpec("/abc",
+                new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp, null);
+            lp.add(A);
+        
+            // B = load
+            LogicalSchema bschema = new LogicalSchema();
+            bschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LOLoad B = new LOLoad(new FileSpec("/def",
+                new FuncSpec("PigStorage", "\t")), bschema, lp, null);
+            lp.add(B);
+        
+            // C = join
+            LogicalSchema cschema = new LogicalSchema();
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(aprojplan, 0, 0, null);
+            LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(bprojplan, 1, 0, null);
+            MultiMap<Integer, LogicalExpressionPlan> mm = 
+                new MultiMap<Integer, LogicalExpressionPlan>();
+            mm.put(0, aprojplan);
+            mm.put(1, bprojplan);
+            LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+            C.neverUseForRealSetSchema(cschema);
+            lp.add(C);
+            lp.connect(A, C);
+            lp.connect(B, C);
+            
+            // D = filter
+            LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+            ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, null);
+            ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+            new EqualExpression(filterPlan, fy, fc);
+            LOFilter D = new LOFilter(lp, filterPlan);
+            D.neverUseForRealSetSchema(cschema);
+            lp.add(D);
+            lp.connect(C, D);
+        }
+        
+        // Build a second similar plan to test equality
+        // A = load
+        LogicalPlan lp1 = new LogicalPlan();
+        {
+            LogicalSchema aschema = new LogicalSchema();
+            aschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            LOLoad A = new LOLoad(new FileSpec("/abc",
+                new FuncSpec("/fooload", new String[] {"x", "y"})), aschema, lp1, null);
+            lp1.add(A);
+            
+            // B = load
+            LogicalSchema bschema = new LogicalSchema();
+            bschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LOLoad B = new LOLoad(new FileSpec("/def",
+                new FuncSpec("PigStorage", "\t")), bschema, lp1, null);
+            lp1.add(B);
+            
+            // C = join
+            LogicalSchema cschema = new LogicalSchema();
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "x", null, DataType.INTEGER));
+            cschema.addField(new LogicalSchema.LogicalFieldSchema(
+                "y", null, DataType.INTEGER));
+            LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(aprojplan, 0, 0, null);
+            LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+            new ProjectExpression(bprojplan, 1, 0, null);
+            MultiMap<Integer, LogicalExpressionPlan> mm = 
+                new MultiMap<Integer, LogicalExpressionPlan>();
+            mm.put(0, aprojplan);
+            mm.put(1, bprojplan);
+            LOJoin C = new LOJoin(lp1, mm, JOINTYPE.HASH, new boolean[] {true, true});
+            C.neverUseForRealSetSchema(cschema);
+            lp1.add(C);
+            lp1.connect(A, C);
+            lp1.connect(B, C);
+                
+            
+            // D = filter
+            LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+            ProjectExpression fy = new ProjectExpression(filterPlan, 0, 1, null);
+            ConstantExpression fc = new ConstantExpression(filterPlan, new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+            new EqualExpression(filterPlan, fy, fc);
+            LOFilter D = new LOFilter(lp1, filterPlan);
+            D.neverUseForRealSetSchema(cschema);
+            lp1.add(D);
+            lp1.connect(C, D);
+                
+        }
+        
+        assertTrue( lp.isEqual(lp1));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFuncSpecCtorArgs() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+        
+        LOLoad load2 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp, null);
+        lp.add(load2);
+        
+        assertFalse(load1.isEqual(load2));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentNumFuncSpecCstorArgs() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+        
+        LOLoad load3 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", "x")), aschema1, lp, null);
+        lp.add(load3);
+        
+        assertFalse(load1.isEqual(load3));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFunctionNames() {
+        LogicalPlan lp = new LogicalPlan();
+        
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+        
+         // Different function names in FuncSpec
+        LOLoad load4 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foobar", new String[] {"x", "z"})), aschema1, lp, null);
+        lp.add(load4);
+        
+        assertFalse(load1.isEqual(load4));
+    }
+    
+    @Test
+    public void testLoadEqualityDifferentFileName() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+    
+        // Different file name
+        LOLoad load5 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema1, lp, null);
+        lp.add(load5);
+        
+        assertFalse(load1.isEqual(load5));
+    }
+    
+    @Test
+    public void testRelationalEqualityDifferentSchema() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+        
+        // Different schema
+        LogicalSchema aschema2 = new LogicalSchema();
+        aschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.CHARARRAY));
+        
+        LOLoad load6 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), aschema2, lp, null);
+        lp.add(load6);
+            
+        assertFalse(load1.isEqual(load6));
+    }
+    
+    @Test
+    public void testRelationalEqualityNullSchemas() {
+        LogicalPlan lp = new LogicalPlan();
+        // Test that two loads with no schema are still equal
+        LOLoad load7 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), null, lp, null);
+        lp.add(load7);
+        
+        LOLoad load8 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), null, lp, null);
+        lp.add(load8);
+        
+        assertTrue(load7.isEqual(load8));
+    }
+    
+    @Test
+    public void testRelationalEqualityOneNullOneNotNullSchema() {
+        LogicalPlan lp = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad load1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "y"})), aschema1, lp, null);
+        lp.add(load1);
+        
+        // Test that one with schema and one without breaks equality
+        LOLoad load9 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("foo", new String[] {"x", "z"})), null, lp, null);
+        lp.add(load9);
+        
+        assertFalse(load1.isEqual(load9));
+    }
+        
+    @Test
+    public void testFilterDifferentPredicates() {
+        LogicalPlan lp = new LogicalPlan();
+            
+        LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+        ProjectExpression fy1 = new ProjectExpression(fp1, 0, 1, null);
+        ConstantExpression fc1 = new ConstantExpression(fp1,
+            new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        new EqualExpression(fp1, fy1, fc1);
+        LOFilter D1 = new LOFilter(lp, fp1);
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        D1.neverUseForRealSetSchema(cschema);
+        lp.add(D1);
+        
+        LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+        ProjectExpression fy2 = new ProjectExpression(fp2, 0, 1, null);
+        ConstantExpression fc2 = new ConstantExpression(fp2, 
+            new Integer(1), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        new EqualExpression(fp2, fy2, fc2);
+        LOFilter D2 = new LOFilter(lp, fp2);
+        D2.neverUseForRealSetSchema(cschema);
+        lp.add(D2);
+        
+        assertFalse(D1.isEqual(D2));
+    }
+        
+    // No tests for LOStore because it tries to actually instantiate the store
+    // func, and I don't want to mess with that here.
+    
+    @Test
+    public void testJoinDifferentJoinTypes() throws IOException {
+       LogicalPlan lp = new LogicalPlan();
+       LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp, null);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp, null);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, 0, 0, null);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(C1);
+        lp.connect(A1, C1);
+        lp.connect(B1, C1);
+        
+        // A = load
+        LogicalSchema jaschema2 = new LogicalSchema();
+        jaschema2.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A2 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema2, lp, null);
+        lp.add(A2);
+        
+        // B = load
+        LogicalSchema jbschema2 = new LogicalSchema();
+        jbschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B2 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema2, lp, null);
+        lp.add(B2);
+        
+        // C = join
+        LogicalSchema jcschema2 = new LogicalSchema();
+        jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema2.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan2 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan2, 0, 0, null);
+        LogicalExpressionPlan bprojplan2 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan2, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm2 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm2.put(0, aprojplan2);
+        mm2.put(1, bprojplan2);
+        LOJoin C2 = new LOJoin(lp, mm2, JOINTYPE.SKEWED, new boolean[] {true, true});
+        C2.neverUseForRealSetSchema(jcschema2);
+        lp.add(C2);
+        lp.connect(A2, C2);
+        lp.connect(B2, C2);
+        
+        assertFalse(C1.isEqual(C2));
+    }
+    
+    @Test
+    public void testJoinDifferentInner() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+               LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp, null);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp, null);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, 0, 0, null);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(C1);
+        lp.connect(A1, C1);
+        lp.connect(B1, C1);
+        
+ 
+        // Test different inner status
+        // A = load
+        LogicalSchema jaschema3 = new LogicalSchema();
+        jaschema3.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A3 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema3, lp, null);
+        lp.add(A3);
+        
+        // B = load
+        LogicalSchema jbschema3 = new LogicalSchema();
+        jbschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B3 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema3, lp, null);
+        lp.add(B3);
+        
+        // C = join
+        LogicalSchema jcschema3 = new LogicalSchema();
+        jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema3.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan3 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan3, 0, 0, null);
+        LogicalExpressionPlan bprojplan3 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan3, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm3 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm3.put(0, aprojplan3);
+        mm3.put(1, bprojplan3);
+        LOJoin C3 = new LOJoin(lp, mm3, JOINTYPE.HASH, new boolean[] {true, false});
+        C3.neverUseForRealSetSchema(jcschema3);
+        lp.add(C3);
+        lp.connect(A3, C3);
+        lp.connect(B3, C3);
+        
+        
+        assertFalse(C1.isEqual(C3));
+    }
+ 
+    @Test
+    public void testJoinDifferentNumInputs() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+               LogicalSchema jaschema1 = new LogicalSchema();
+       jaschema1.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+       LOLoad A1 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema1, lp, null);
+       lp.add(A1);
+        
+        // B = load
+        LogicalSchema jbschema1 = new LogicalSchema();
+        jbschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B1 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema1, lp, null);
+        lp.add(B1);
+        
+        // C = join
+        LogicalSchema jcschema1 = new LogicalSchema();
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan1, 0, 0, null);
+        LogicalExpressionPlan bprojplan1 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan1, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm1 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm1.put(0, aprojplan1);
+        mm1.put(1, bprojplan1);
+        LOJoin C1 = new LOJoin(lp, mm1, JOINTYPE.HASH, new boolean[] {true, true});
+        C1.neverUseForRealSetSchema(jcschema1);
+        lp.add(C1);
+        lp.connect(A1, C1);
+        lp.connect(B1, C1);
+ 
+        // A = load
+        LogicalSchema jaschema5 = new LogicalSchema();
+        jaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A5 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema5, lp, null);
+        lp.add(A5);
+        
+        // B = load
+        LogicalSchema jbschema5 = new LogicalSchema();
+        jbschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad B5 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema5, lp, null);
+        lp.add(B5);
+        
+        // Beta = load
+        LogicalSchema jbetaschema5 = new LogicalSchema();
+        jbetaschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LOLoad Beta5 = new LOLoad(new FileSpec("/ghi",
+            new FuncSpec("PigStorage", "\t")), jbetaschema5, lp, null);
+        lp.add(Beta5);
+        
+        // C = join
+        LogicalSchema jcschema5 = new LogicalSchema();
+        jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema5.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan5, 0, 0, null);
+        LogicalExpressionPlan bprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan5, 1, 0, null);
+        LogicalExpressionPlan betaprojplan5 = new LogicalExpressionPlan();
+        new ProjectExpression(betaprojplan5, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm5 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm5.put(0, aprojplan5);
+        mm5.put(1, bprojplan5);
+        mm5.put(2, betaprojplan5);
+        LOJoin C5 = new LOJoin(lp, mm5, JOINTYPE.HASH, new boolean[] {true, true});
+        C5.neverUseForRealSetSchema(jcschema5);
+        lp.add(C5);
+        lp.connect(A5, C5);
+        lp.connect(B5, C5);
+        lp.connect(Beta5, C5);
+        
+        assertFalse(C1.isEqual(C5));
+    }
+        
+    @Test
+    public void testJoinDifferentJoinKeys() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+        
+        // Test different join keys
+        LogicalSchema jaschema6 = new LogicalSchema();
+        jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A6 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp, null);
+        lp.add(A6);
+        
+        // B = load
+        LogicalSchema jbschema6 = new LogicalSchema();
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B6 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema6, lp, null);
+        lp.add(B6);
+        
+        // C = join
+        LogicalSchema jcschema6 = new LogicalSchema();
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan6, 0, 0, null);
+        LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan6, 1, 0, null);
+        LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan6, 1, 1, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm6 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm6.put(0, aprojplan6);
+        mm6.put(1, bprojplan6);
+        mm6.put(1, b2projplan6);
+        LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+        C6.neverUseForRealSetSchema(jcschema6);
+        lp.add(C6);
+        lp.connect(A6, C6);
+        lp.connect(B6, C6);
+        
+        
+        LogicalSchema jaschema7 = new LogicalSchema();
+        jaschema7.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A7 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema7, lp, null);
+        lp.add(A7);
+        
+        // B = load
+        LogicalSchema jbschema7 = new LogicalSchema();
+        jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B7 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema7, lp, null);
+        lp.add(B7);
+        
+        // C = join
+        LogicalSchema jcschema7 = new LogicalSchema();
+        jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema7.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan7, 0, 0, null);
+        LogicalExpressionPlan bprojplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan7, 1, 1, null);
+        LogicalExpressionPlan b2projplan7 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan7, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm7 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm7.put(0, aprojplan7);
+        mm7.put(1, bprojplan7);
+        mm7.put(1, b2projplan7);
+        LOJoin C7 = new LOJoin(lp, mm7, JOINTYPE.HASH, new boolean[] {true, true});
+        C7.neverUseForRealSetSchema(jcschema7);
+        lp.add(C7);
+        lp.connect(A7, C7);
+        lp.connect(B7, C7);
+        
+        assertFalse(C6.isEqual(C7));
+    }
+    
+    @Test
+    public void testJoinDifferentNumJoinKeys() throws IOException {
+        LogicalPlan lp = new LogicalPlan();
+        
+        // Test different join keys
+        LogicalSchema jaschema6 = new LogicalSchema();
+        jaschema6.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A6 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema6, lp, null);
+        lp.add(A6);
+        
+        // B = load
+        LogicalSchema jbschema6 = new LogicalSchema();
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B6 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema6, lp, null);
+        lp.add(B6);
+        
+        // C = join
+        LogicalSchema jcschema6 = new LogicalSchema();
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema6.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan6, 0, 0, null);
+        LogicalExpressionPlan bprojplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan6, 1, 0, null);
+        LogicalExpressionPlan b2projplan6 = new LogicalExpressionPlan();
+        new ProjectExpression(b2projplan6, 1, 1, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm6 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm6.put(0, aprojplan6);
+        mm6.put(1, bprojplan6);
+        mm6.put(1, b2projplan6);
+        LOJoin C6 = new LOJoin(lp, mm6, JOINTYPE.HASH, new boolean[] {true, true});
+        C6.neverUseForRealSetSchema(jcschema6);
+        lp.add(C6);
+        lp.connect(A6, C6);
+        lp.connect(B6, C6);
+        
+        // Test different different number of join keys
+        LogicalSchema jaschema8 = new LogicalSchema();
+        jaschema8.addField(new LogicalSchema.LogicalFieldSchema(
+           "x", null, DataType.INTEGER));
+        LOLoad A8 = new LOLoad(new FileSpec("/abc",
+           new FuncSpec("/fooload", new String[] {"x", "y"})), jaschema8, lp, null);
+        lp.add(A8);
+        
+        // B = load
+        LogicalSchema jbschema8 = new LogicalSchema();
+        jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        jbschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "z", null, DataType.LONG));
+        LOLoad B8 = new LOLoad(new FileSpec("/def",
+            new FuncSpec("PigStorage", "\t")), jbschema8, lp, null);
+        lp.add(B8);
+        
+        // C = join
+        LogicalSchema jcschema8 = new LogicalSchema();
+        jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        jcschema8.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        LogicalExpressionPlan aprojplan8 = new LogicalExpressionPlan();
+        new ProjectExpression(aprojplan8, 0, 0, null);
+        LogicalExpressionPlan bprojplan8 = new LogicalExpressionPlan();
+        new ProjectExpression(bprojplan8, 1, 0, null);
+        MultiMap<Integer, LogicalExpressionPlan> mm8 = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm8.put(0, aprojplan8);
+        mm8.put(1, bprojplan8);
+        LOJoin C8 = new LOJoin(lp, mm8, JOINTYPE.HASH, new boolean[] {true, true});
+        C8.neverUseForRealSetSchema(jcschema8);
+        lp.add(C8);
+        lp.connect(A8, C8);
+        lp.connect(B8, C8);
+        
+        assertFalse(C6.isEqual(C8));
+    }
+    
+    @Test
+    public void testRelationalSameOpDifferentPreds() throws IOException {
+        LogicalPlan lp1 = new LogicalPlan();
+        LogicalSchema aschema1 = new LogicalSchema();
+        aschema1.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        LOLoad A1 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("/fooload", new String[] {"x", "y"})), aschema1, lp1, null);
+        lp1.add(A1);
+        
+        LogicalExpressionPlan fp1 = new LogicalExpressionPlan();
+        ProjectExpression fy1 = new ProjectExpression(fp1, 0, 0, null);
+        ConstantExpression fc1 = new ConstantExpression(fp1, 
+            new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        new EqualExpression(fp1, fy1, fc1);
+        LOFilter D1 = new LOFilter(lp1, fp1);
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        D1.neverUseForRealSetSchema(cschema);
+        lp1.add(D1);
+        lp1.connect(A1, D1);
+        
+        LogicalPlan lp2 = new LogicalPlan();
+        LOLoad A2 = new LOLoad(new FileSpec("/abc",
+            new FuncSpec("/foo", new String[] {"x", "z"})), null, lp2, null);
+        lp2.add(A2);
+        
+        LogicalExpressionPlan fp2 = new LogicalExpressionPlan();
+        ProjectExpression fy2 = new ProjectExpression(fp2, 0, 0, null);
+        ConstantExpression fc2 = new ConstantExpression(fp2, 
+            new Integer(0), new LogicalFieldSchema(null, null, DataType.INTEGER));
+        new EqualExpression(fp2, fy2, fc2);
+        LOFilter D2 = new LOFilter(lp2, fp2);
+        D2.neverUseForRealSetSchema(cschema);
+        lp2.add(D2);
+        lp2.connect(A2, D2);
+        
+        assertFalse(D1.isEqual(D2));
+    }
+    
+ 
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,282 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import junit.framework.TestCase;
+
+import org.apache.pig.ExecType;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
+import org.apache.pig.newplan.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.newplan.logical.optimizer.SchemaPatcher;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.rules.ColumnMapKeyPrune;
+import org.apache.pig.newplan.logical.rules.MapKeysPruneHelper;
+import org.apache.pig.newplan.optimizer.PlanOptimizer;
+import org.apache.pig.newplan.optimizer.PlanTransformListener;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+public class TestNewPlanPruneMapKeys extends TestCase {
+    PigContext pc = new PigContext(ExecType.LOCAL, new Properties());
+    
+    private org.apache.pig.newplan.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        
+        try {
+            // run filter rule
+            Set<Rule> s = new HashSet<Rule>();
+            List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+            ls.add(s);
+            // Add the PruneMap Filter
+            Rule r = new ColumnMapKeyPrune("PruneMapKeys");
+            s.add(r);            
+            
+            printPlan((org.apache.pig.newplan.logical.relational.LogicalPlan)newPlan);
+            
+            // Run the optimizer
+            MyPlanOptimizer optimizer = new MyPlanOptimizer(newPlan, ls, 3);
+            optimizer.addPlanTransformListener(new ProjectionPatcher());
+            optimizer.addPlanTransformListener(new SchemaPatcher());
+            optimizer.optimize();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);           
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+        
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        assertEquals( 1, newLogicalPlan.getSources().size() );
+        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
+        Map<Long,Set<String>> annotation = 
+            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertTrue( annotation != null );
+        assertEquals( 1, annotation.keySet().size() );
+        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
+        assertEquals( new Integer(0), keySet[0] );
+        Set<String> keys = annotation.get(0);
+        assertEquals( 1, keys.size() );
+        assertEquals( "name", keys.toArray( new String[0] )[0] );            
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan3() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate a#'age',b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        assertEquals( 1, newLogicalPlan.getSources().size() );
+        LOLoad load = (LOLoad) newLogicalPlan.getSources().get(0);
+        Map<Long,Set<String>> annotation = 
+            (Map<Long, Set<String>>) load.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+        assertTrue( annotation != null );
+        assertEquals( 1, annotation.keySet().size() );
+        Integer[] keySet = annotation.keySet().toArray( new Integer[0] );
+        assertEquals( new Integer(0), keySet[0] );
+        Set<String> keys = annotation.get(0);
+        assertEquals( 2, keys.size() );
+        assertTrue( keys.contains("name") );
+        assertTrue( keys.contains("age"));
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan4() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (a:map[], b:int, c:float);");
+        lpt.buildPlan("b = filter a by a#'name' == 'hello';");
+        lpt.buildPlan("c = foreach b generate a#'age',a,b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan5() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt' as (a:chararray, b:int, c:float);");
+        lpt.buildPlan("b = filter a by a == 'hello';");
+        lpt.buildPlan("c = foreach b generate a,b,c;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan6() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = filter a by $0 == 'hello';");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 1, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue(annotation == null );
+        }
+    }
+    
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan7() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("a1 = load 'b.txt' as (a:map[],b:int, c:float);" );
+        lpt.buildPlan("b = join a by $0, a1 by a#'name';");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        printPlan(plan);
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 2, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue( annotation == null || annotation.isEmpty() );
+        }
+    }
+
+    @SuppressWarnings("unchecked")
+    public void testSimplePlan8() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester(pc);
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("a1 = load 'b.txt' as (a:chararray,b:int, c:float);" );
+        lpt.buildPlan("b = join a by $0, a1 by a;");
+        lpt.buildPlan("c = foreach b generate $0,$1,$2;" );
+        LogicalPlan plan = lpt.buildPlan("store c into 'empty';");        
+        
+        printPlan(plan);
+        
+        org.apache.pig.newplan.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        
+        List<Operator> sources = newLogicalPlan.getSources();
+        assertEquals( 2, sources.size() );
+        for( Operator source : sources ) {
+            Map<Long,Set<String>> annotation = 
+                (Map<Long, Set<String>>) source.getAnnotation(MapKeysPruneHelper.REQUIRED_MAPKEYS);
+            assertTrue( annotation == null || annotation.isEmpty() );
+        }
+    }
+    
+    public void printPlan(org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan ) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        LogicalPlanPrinter pp = new LogicalPlanPrinter(logicalPlan,ps);
+        pp.visit();
+        System.err.println(out.toString());
+    }
+    
+    public void printPlan(LogicalPlan logicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        logicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+    
+    public void printPlan(PhysicalPlan physicalPlan) throws Exception {
+        ByteArrayOutputStream out = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(out);
+        physicalPlan.explain(ps, "text", true);
+        System.err.println(out.toString());
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,367 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.util.List;
+
+import org.apache.pig.newplan.BaseOperatorPlan;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.PlanVisitor;
+import org.apache.pig.newplan.optimizer.Rule;
+import org.apache.pig.newplan.optimizer.Transformer;
+
+import junit.framework.TestCase;
+
+public class TestNewPlanRule extends TestCase {
+
+    private static class SillyRule extends Rule {
+    
+        public SillyRule(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+        
+        @Override
+        public Transformer getNewTransformer() {			
+            return null;
+        }
+
+        @Override
+        protected OperatorPlan buildPattern() {
+            // TODO Auto-generated method stub
+            return null;
+        }
+        
+    }
+    
+    private static class SillyPlan extends BaseOperatorPlan {
+            
+        SillyPlan() {
+            super();
+        }
+
+        @Override
+        public boolean isEqual(OperatorPlan other) {
+            return false;
+        }
+
+    }
+    
+    private static class OP extends Operator {
+        OP(String n, OperatorPlan p) {
+            super(n, p);           
+        }
+
+        public void accept(PlanVisitor v) {
+            
+        }
+
+        @Override
+        public boolean isEqual(Operator operator) {
+            return false;
+        }
+    }
+    
+    private static class OP_Load extends OP {
+        OP_Load(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+    }
+    
+    private static class OP_Filter extends OP {
+        OP_Filter(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+    }
+    
+    private static class OP_Split extends OP {
+        OP_Split(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+    }
+    
+    private static class OP_Store extends OP {
+        OP_Store(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+    }
+    
+    private static class OP_Join extends OP {
+        OP_Join(String n, OperatorPlan p) {
+            super(n, p);            
+        }
+    }
+
+    
+    OperatorPlan plan = null;
+    Operator join;
+    
+    public void setUp() {
+        plan = new SillyPlan();
+        Operator l1 = new OP_Load("p1", plan);
+        plan.add(l1);
+        Operator l2 = new OP_Load("p2", plan);
+        plan.add(l2);
+        Operator j1 = new OP_Join("j1", plan);
+        plan.add(j1);
+        Operator f1 = new OP_Filter("f1", plan);
+        plan.add(f1);
+        Operator f2 = new OP_Filter("f2", plan);
+        plan.add(f2);
+        Operator t1 = new OP_Split("t1",plan);
+        plan.add(t1);
+        Operator f3 = new OP_Filter("f3", plan);
+        plan.add(f3);
+        Operator f4 = new OP_Filter("f4", plan);
+        plan.add(f4);
+        Operator s1 = new OP_Store("s1", plan);
+        plan.add(s1);
+        Operator s2 = new OP_Store("s2", plan);
+        plan.add(s2);
+        
+        // load --|-join - filter - filter - split |- filter - store
+        // load --|                                |- filter - store
+        plan.connect(l1, j1);
+        plan.connect(l2, j1);
+        plan.connect(j1, f1);
+        plan.connect(f1, f2);
+        plan.connect(f2, t1);
+        plan.connect(t1, f3);
+        plan.connect(t1, f4);
+        plan.connect(f3, s1);
+        plan.connect(f4, s2); 
+        
+        join = j1;
+    }
+    
+    
+    public void testMultiNode() throws Exception {    
+        //         load --|-join - filter - filter - split |- filter - store
+        //         load --|      
+        // load -- filter-|
+        Operator l3 = new OP_Load("p3", plan);
+        Operator f5 = new OP_Filter("f5", plan);
+        plan.add(l3);
+        plan.add(f5);
+        plan.connect(l3, f5);
+            
+         plan.connect(f5, join);
+       
+        
+         OperatorPlan pattern = new SillyPlan();
+         Operator op1 = new OP_Load("mmm1", pattern);
+         Operator op2 = new OP_Filter("mmm2", pattern);
+         Operator op3 = new OP_Join("mmm3", pattern);
+         pattern.add(op1);
+         pattern.add(op2);
+         pattern.add(op3);
+         pattern.connect(op1, op3);
+         pattern.connect(op2, op3);
+         
+         Rule r = new SillyRule("basic", pattern);
+         List<OperatorPlan> l = r.match(plan);
+         assertEquals(1, l.size());
+         OperatorPlan match = l.get(0);
+         assertEquals(match.size(), 3);
+         assertEquals(match.getSinks().size(), 1);
+         assertEquals(match.getSinks().get(0), join);
+         
+         assertEquals(match.getSources().size(), 2);
+         assertTrue(match.getSources().get(0).getClass().equals(OP_Load.class) || match.getSources().get(0).equals(f5) );
+         assertTrue(match.getSources().get(1).getClass().equals(OP_Load.class) || match.getSources().get(1).equals(f5) );
+         assertNotSame(match.getSources().get(0), match.getSources().get(1));
+    }
+    
+    public void testSingleNodeMatch() {
+        // search for Load 
+        OperatorPlan pattern = new SillyPlan();
+        pattern.add(new OP_Load("mmm", pattern));
+        
+        Rule r = new SillyRule("basic", pattern);
+        List<OperatorPlan> l = r.match(plan);
+        assertEquals(l.size(), 2);
+        
+        Operator m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
+        assertEquals(l.get(0).size(), 1);
+        
+        Operator m2 = l.get(1).getSources().get(0);
+        assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));
+        assertEquals(l.get(1).size(), 1);
+        assertNotSame(m1.getName(), m2.getName());
+       
+        // search for filter
+        pattern = new SillyPlan();
+        pattern.add(new OP_Filter("mmm",pattern));
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 4);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
+                || m1.getName().equals("f3") || m1.getName().equals("f4"));
+        assertEquals(l.get(0).size(), 1);
+        
+        m2 = l.get(1).getSources().get(0);
+        assertTrue(m1.getName().equals("f1") || m1.getName().equals("f2") 
+                || m1.getName().equals("f3") || m1.getName().equals("f4"));
+        assertEquals(l.get(1).size(), 1);
+        assertNotSame(m1.getName(), m2.getName());
+        
+        // search for store
+        pattern = new SillyPlan();
+        pattern.add(new OP_Store("mmm",pattern));
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 2);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("s1") || m1.getName().equals("s2"));
+        assertEquals(l.get(0).size(), 1);
+        
+        m2 = l.get(1).getSources().get(0);
+        assertTrue(m2.getName().equals("s1") || m2.getName().equals("s2"));
+        assertEquals(l.get(1).size(), 1);
+        assertNotSame(m1.getName(), m2.getName());
+        
+        // search for split
+        pattern = new SillyPlan();
+        pattern.add(new OP_Split("mmm",pattern));
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 1);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("t1"));
+        assertEquals(l.get(0).size(), 1);
+        
+        // search for join
+        pattern = new SillyPlan();
+        pattern.add(new OP_Join("mmm",pattern));
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 1);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("j1"));
+        assertEquals(l.get(0).size(), 1);
+      
+    }
+    
+    public void testTwoNodeMatch() {
+        // search for 2 Loads at the same time 
+        OperatorPlan pattern = new SillyPlan();
+        pattern.add(new OP_Load("mmm1", pattern));
+        pattern.add(new OP_Load("mmm2", pattern));
+        
+        Rule r = new SillyRule("basic", pattern);
+        List<OperatorPlan> l = r.match(plan);
+        assertEquals(l.size(), 1);
+        
+        assertEquals(l.get(0).getSources().size(), 2);
+        assertEquals(l.get(0).getSinks().size(), 2);
+        assertEquals(l.get(0).size(), 2);
+        
+        Operator m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("p1") || m1.getName().equals("p2"));
+        Operator m2 = l.get(0).getSources().get(1);
+        assertTrue(m2.getName().equals("p1") || m2.getName().equals("p2"));       
+        assertNotSame(m1.getName(), m2.getName());
+       
+        
+        // search for join then filter
+        pattern = new SillyPlan();
+        Operator s1 = new OP_Join("mmm1", pattern);
+        Operator s2 = new OP_Filter("mmm2", pattern);
+        pattern.add(s1);
+        pattern.add(s2);        
+        pattern.connect(s1, s2);
+        
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 1);
+        
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);
+        assertEquals(l.get(0).size(), 2);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("j1"));
+        m2 = l.get(0).getSinks().get(0);
+        assertTrue(m2.getName().equals("f1"));       
+       
+  
+        // search for filter, then store
+        pattern = new SillyPlan();
+        s1 = new OP_Filter("mmm1", pattern);
+        s2 = new OP_Store("mmm2", pattern);        
+        pattern.add(s1);
+        pattern.add(s2);           
+        pattern.connect(s1, s2);        
+        
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(2, l.size());
+        
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 1);                     
+        
+        // search for 2 loads, then join
+        pattern = new SillyPlan();
+        s1 = new OP_Load("mmm1", pattern);
+        s2 = new OP_Load("mmm2", pattern);
+        Operator s3 = new OP_Join("jjj", pattern);
+        pattern.add(s1);
+        pattern.add(s2);
+        pattern.add(s3);
+        pattern.connect(s1, s3);
+        pattern.connect(s2, s3);
+        
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(l.size(), 1);
+        
+        // search for split then 2 filters
+        pattern = new SillyPlan();
+        s1 = new OP_Split("mmm1", pattern);
+        s2 = new OP_Filter("mmm2", pattern);
+        s3 = new OP_Filter("mmm3", pattern);
+        pattern.add(s1);
+        pattern.add(s2);        
+        pattern.add(s3);
+        pattern.connect(s1, s2);
+        pattern.connect(s1, s3);
+        
+        r = new SillyRule("basic", pattern);
+        l = r.match(plan);
+        assertEquals(1, l.size());
+        
+        assertEquals(l.get(0).getSources().size(), 1);
+        assertEquals(l.get(0).getSinks().size(), 2);
+        assertEquals(l.get(0).size(), 3);
+        
+        m1 = l.get(0).getSources().get(0);
+        assertTrue(m1.getName().equals("t1"));
+        m2 = l.get(0).getSinks().get(0);
+        assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
+        m2 = l.get(0).getSinks().get(1);
+        assertTrue(m2.getName().equals("f3") || m2.getName().equals("f4"));    
+    }
+   
+}

Modified: hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java Wed Aug  4 17:46:42 2010
@@ -33,9 +33,9 @@ import junit.framework.Assert;
 import org.apache.pig.ExecType;
 import org.apache.pig.PigRunner;
 import org.apache.pig.PigRunner.ReturnCode;
-import org.apache.pig.experimental.plan.Operator;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
+import org.apache.pig.newplan.Operator;
 import org.apache.pig.tools.pigstats.JobStats;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigProgressNotificationListener;

Modified: hadoop/pig/trunk/test/org/apache/pig/test/Util.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/Util.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/Util.java (original)
+++ hadoop/pig/trunk/test/org/apache/pig/test/Util.java Wed Aug  4 17:46:42 2010
@@ -64,13 +64,13 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.experimental.logical.optimizer.PlanPrinter;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.parser.ParseException;
 import org.apache.pig.impl.logicalLayer.parser.QueryParser;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanPrinter;
 import org.apache.pig.tools.grunt.Grunt;
 import org.apache.pig.tools.grunt.GruntParser;
 
@@ -573,10 +573,10 @@ public class Util {
         createInputFile(FileSystem.get(conf), fileName, input); 
     }
     
-    public static void printPlan(org.apache.pig.experimental.logical.relational.LogicalPlan logicalPlan ) throws Exception {
+    public static void printPlan(org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan ) throws Exception {
         ByteArrayOutputStream out = new ByteArrayOutputStream();
         PrintStream ps = new PrintStream(out);
-        PlanPrinter pp = new PlanPrinter(logicalPlan,ps);
+        LogicalPlanPrinter pp = new LogicalPlanPrinter(logicalPlan,ps);
         pp.visit();
         System.err.println(out.toString());
     }