You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ga...@apache.org on 2010/02/11 23:12:43 UTC

svn commit: r909165 [5/6] - in /hadoop/pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/logical/ src/org/apache/pig/experimental/logical/expression/ src/org/apache/pig/experimental/logica...

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.*;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.*;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LOStore;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.rules.MergeFilter;
+import org.apache.pig.experimental.logical.rules.PushUpFilter;
+import org.apache.pig.experimental.logical.rules.SplitFilter;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.optimizer.PlanOptimizer;
+import org.apache.pig.experimental.plan.optimizer.PlanTransformListener;
+import org.apache.pig.experimental.plan.optimizer.Rule;
+import org.apache.pig.impl.util.MultiMap;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalFilterRule extends TestCase {
+
+    LogicalPlan plan = null;
+    LogicalRelationalOperator load1 = null;
+    LogicalRelationalOperator load2 = null;
+    LogicalRelationalOperator filter = null;
+    LogicalRelationalOperator join = null;
+    LogicalRelationalOperator store = null;    
+    
+    private void prep() {
+        plan = new LogicalPlan();
+        LogicalSchema schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("name", null, DataType.CHARARRAY));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("age", null, DataType.INTEGER));    
+        schema.getField(0).uid = 1;
+        schema.getField(1).uid = 2;
+        schema.getField(2).uid = 3;
+        LogicalRelationalOperator l1 = new LOLoad(null, schema, plan);
+        l1.setAlias("A");
+        plan.add(l1);
+
+        schema = new LogicalSchema();
+        schema.addField(new LogicalSchema.LogicalFieldSchema("id", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("dept", null, DataType.INTEGER));
+        schema.addField(new LogicalSchema.LogicalFieldSchema("salary", null, DataType.FLOAT));    
+        schema.getField(0).uid = 4;
+        schema.getField(1).uid = 5;
+        schema.getField(2).uid = 6;
+        LogicalRelationalOperator l2 = new LOLoad(null, schema, plan);
+        l2.setAlias("B");
+        plan.add(l2);
+        
+        MultiMap<Integer, LogicalExpressionPlan> joinPlans = new MultiMap<Integer, LogicalExpressionPlan>();
+        LogicalExpressionPlan p1 = new LogicalExpressionPlan();
+        ProjectExpression lp1 = new ProjectExpression(p1, DataType.CHARARRAY, 0, 1);
+        p1.add(lp1);
+        joinPlans.put(0, p1);
+        
+        LogicalExpressionPlan p2 = new LogicalExpressionPlan();
+        ProjectExpression lp2 = new ProjectExpression(p2, DataType.INTEGER, 1, 1);
+        p2.add(lp2);
+        joinPlans.put(1, p2);
+     
+        LogicalRelationalOperator j1 = new LOJoin(plan, joinPlans, LOJoin.JOINTYPE.HASH, new boolean[]{true, true});
+        j1.setAlias("C");
+        plan.add(j1);
+        
+        
+        // build an expression with no AND
+        LogicalExpressionPlan p3 = new LogicalExpressionPlan();
+        LogicalExpression lp3 = new ProjectExpression(p3, DataType.INTEGER, 0, 2);
+        LogicalExpression cont = new ConstantExpression(p3, DataType.INTEGER, new Integer(3));
+        p3.add(lp3);
+        p3.add(cont);       
+        LogicalExpression eq = new EqualExpression(p3, lp3, cont);        
+        
+        LogicalRelationalOperator f1 = new LOFilter(plan, p3);
+        f1.setAlias("D");
+        plan.add(f1);
+        
+        LogicalRelationalOperator s1 = new LOStore(plan);
+        plan.add(s1);       
+        
+        // load --|-join - filter - store
+        // load --|   
+        plan.connect(l1, j1);
+        plan.connect(l2, j1);
+        plan.connect(j1, f1);        
+        plan.connect(f1, s1);      
+        
+        try {
+            lp1.setUid(j1);
+            lp2.setUid(j1);
+            lp3.setUid(f1);
+        }catch(Exception e) {
+            
+        }
+        
+        filter = f1;
+        store = s1;
+        join = j1;
+        load1 = l1;
+        load2 = l2;
+    }
+    
+    public void testFilterRule() throws Exception  {
+        prep();
+        // run split filter rule
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(filter).get(0), store);
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filter should be moved up to be after load
+        assertEquals(plan.getSuccessors(load1).get(0), filter);
+        assertEquals(plan.getSuccessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(join).get(0), store);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filter should the same as before, nothing to merge
+        assertEquals(plan.getSuccessors(load1).get(0), filter);
+        assertEquals(plan.getSuccessors(filter).get(0), join);
+        assertEquals(plan.getSuccessors(join).get(0), store);
+    }
+        
+    // build an expression with 1 AND, it should split into 2 filters
+    public void testFilterRuleWithAnd() throws Exception  {
+        prep();
+        
+        LogicalExpressionPlan p4 = new LogicalExpressionPlan();        
+        LogicalExpression lp3 = new ProjectExpression(p4, DataType.INTEGER, 0, 2);
+        LogicalExpression cont = new ConstantExpression(p4, DataType.INTEGER, new Integer(3));
+        p4.add(lp3);
+        p4.add(cont);
+        LogicalExpression eq = new EqualExpression(p4, lp3, cont);      
+      
+        LogicalExpression lp4 = new ProjectExpression(p4, DataType.FLOAT, 0, 5);
+        LogicalExpression cont2 = new ConstantExpression(p4, DataType.FLOAT, new Float(100));
+        p4.add(lp4);
+        p4.add(cont2);
+        LogicalExpression eq2 = new EqualExpression(p4, lp4, cont2);        
+    
+        LogicalExpression and = new AndExpression(p4, eq, eq2);        
+        
+        lp3.setUid(filter);
+        lp4.setUid(filter);
+        
+        ((LOFilter)filter).setFilterPlan(p4);
+        
+        // run split filter rule
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        PlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(LOFilter.class, next.getClass());        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // both filters should be moved up to be after each load
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        assertEquals(plan.getSuccessors(join).get(0), store);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // the filters should the same as before, nothing to merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        assertEquals(plan.getSuccessors(join).get(0), store);
+    }
+    
+    public void testFilterRuleWith2And() throws Exception  {
+        prep();
+        // build an expression with 2 AND, it should split into 3 filters
+        LogicalExpressionPlan p5 = new LogicalExpressionPlan();
+        
+       
+        LogicalExpression lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2);
+        LogicalExpression cont = new ConstantExpression(p5, DataType.INTEGER, new Integer(3));
+        p5.add(lp3);
+        p5.add(cont);       
+        LogicalExpression eq = new EqualExpression(p5, lp3, cont);
+        
+        LogicalExpression lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3);
+        LogicalExpression cont2 = new ConstantExpression(p5, DataType.INTEGER, new Integer(3));        
+        p5.add(lp4);
+        p5.add(cont2);
+        LogicalExpression eq2 = new EqualExpression(p5, lp4, cont2);        
+        
+        lp3.setUid(filter);
+        lp4.setUid(filter);
+        
+        LogicalExpression and1 = new AndExpression(p5, eq, eq2);
+        
+       
+        lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 0);
+        lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3);     
+        lp3.setUid(filter);
+        lp4.setUid(filter);
+        p5.add(lp3);
+        p5.add(lp4);   
+        eq2 = new EqualExpression(p5, lp3, lp4);        
+              
+        LogicalExpression and2 = new AndExpression(p5, and1, eq2);        
+        
+        ((LOFilter)filter).setFilterPlan(p5);
+        
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        MyPlanTransformListener listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        OperatorPlan transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 3);
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // 2 filters should be moved up to be after each load, and one filter should remain
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 4);
+        assertEquals(transformed.getSinks().get(0).getClass(), LOFilter.class);
+        assertEquals(transformed.getSources().get(0).getClass(), LOLoad.class);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // the filters should the same as before, nothing to merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        transformed = listener.getTransformed();
+        assertNull(transformed);
+    }   
+    
+    public void testFilterRuleWith2And2() throws Exception  {
+        prep();
+        // build an expression with 2 AND, it should split into 3 filters
+        LogicalExpressionPlan p5 = new LogicalExpressionPlan();
+        
+        LogicalExpression lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2);
+        lp3.setUid(filter);
+        LogicalExpression cont = new ConstantExpression(p5, DataType.INTEGER, new Integer(3));
+        p5.add(lp3);
+        p5.add(cont);
+        LogicalExpression eq = new EqualExpression(p5, lp3, cont);      
+        
+        lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 0);
+        LogicalExpression lp4 = new ProjectExpression(p5, DataType.INTEGER, 0, 3);        
+        p5.add(lp4);
+        p5.add(lp3);
+        lp3.setUid(filter);
+        lp4.setUid(filter);
+        LogicalExpression eq2 = new EqualExpression(p5, lp3, lp4);
+        
+        LogicalExpression and1 = new AndExpression(p5, eq, eq2);
+        
+        lp3 = new ProjectExpression(p5, DataType.INTEGER, 0, 2);
+        lp4 = new ProjectExpression(p5, DataType.FLOAT, 0, 5);        
+        p5.add(lp3);
+        p5.add(lp4);
+        lp3.setUid(filter);
+        lp4.setUid(filter);
+        eq2 = new EqualExpression(p5, lp3, lp4);
+        
+        LogicalExpression and2 = new AndExpression(p5, and1, eq2);    
+        
+        ((LOFilter)filter).setFilterPlan(p5);
+        
+        Rule r = new SplitFilter("SplitFilter");
+        Set<Rule> s = new HashSet<Rule>();
+        s.add(r);
+        List<Set<Rule>> ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        MyPlanOptimizer optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        assertEquals(plan.getPredecessors(filter).get(0), join);
+        Operator next = plan.getSuccessors(filter).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(LOStore.class, next.getClass());
+        
+        // run push up filter rule
+        r = new PushUpFilter("PushUpFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        optimizer.optimize();
+        
+        // 1 filter should be moved up to be after a load, and 2 filters should remain
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next, join);     
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+                
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        // run merge filter rule
+        r = new MergeFilter("MergeFilter");
+        s = new HashSet<Rule>();
+        s.add(r);
+        ls = new ArrayList<Set<Rule>>();
+        ls.add(s);
+        optimizer = new MyPlanOptimizer(plan, ls, 3);
+        MyPlanTransformListener listener = new MyPlanTransformListener();
+        optimizer.addPlanTransformListener(listener);
+        optimizer.optimize();
+        
+        // the 2 filters after join should merge
+        next = plan.getSuccessors(load1).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        assertEquals(plan.getSuccessors(next).get(0), join);
+        
+        next = plan.getSuccessors(load2).get(0);
+        assertEquals(next, join);        
+        
+        next = plan.getSuccessors(join).get(0);
+        assertEquals(next.getClass(), LOFilter.class);
+        
+        next = plan.getSuccessors(next).get(0);
+        assertEquals(next.getClass(), LOStore.class);
+        
+        OperatorPlan transformed = listener.getTransformed();
+        assertEquals(transformed.size(), 2);
+    }   
+    
+    public class MyPlanOptimizer extends PlanOptimizer {
+
+        protected MyPlanOptimizer(OperatorPlan p, List<Set<Rule>> rs,
+                int iterations) {
+            super(p, rs, iterations);			
+        }
+        
+        public void addPlanTransformListener(PlanTransformListener listener) {
+            super.addPlanTransformListener(listener);
+        }
+        
+    }
+    
+    public class MyPlanTransformListener implements PlanTransformListener {
+
+        private OperatorPlan tp;
+
+        @Override
+        public void transformed(OperatorPlan fp, OperatorPlan tp)
+                throws IOException {
+            this.tp = tp;
+        }
+        
+        public OperatorPlan getTransformed() {
+            return tp;
+        }
+    }
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionVisitor;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.AllExpressionVisitor;
+import org.apache.pig.experimental.logical.optimizer.AllSameVisitor;
+import org.apache.pig.experimental.logical.optimizer.ProjectionPatcher;
+import org.apache.pig.experimental.logical.optimizer.SchemaPatcher;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.experimental.plan.DepthFirstWalker;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.impl.util.MultiMap;
+import org.junit.Before;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Tests for PlanTransformListerns
+ *
+ */
+public class TestExperimentalListener extends TestCase {
+    
+    private LogicalPlan lp;
+    private LogicalPlan changedPlan;
+
+    /* (non-Javadoc)
+     * @see junit.framework.TestCase#setUp()
+     */
+    @Before
+    protected void setUp() throws Exception {
+        super.setUp();
+        
+        // Build a plan that looks like it has just been transformed
+        // It is roughly the logical plan for
+        // A = load 'bla' as (x);
+        // B = load 'morebla' as (y);
+        // C = join A on x, B on y;
+        // D = filter C by y > 0;
+        // The plan is built with the filter having been pushed above the join
+        // but the listners not yet having been called.
+        // A = load
+        lp = new LogicalPlan();
+        LogicalSchema aschema = new LogicalSchema();
+        aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        aschema.getField(0).uid = 1;
+        LOLoad A = new LOLoad(null, aschema, lp);
+        lp.add(A);
+        
+        // B = load
+        LogicalSchema bschema = new LogicalSchema();
+        bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        bschema.getField(0).uid = 2;
+        LOLoad B = new LOLoad(null, bschema, lp);
+        lp.add(B);
+        
+        // C = join
+        LogicalSchema cschema = new LogicalSchema();
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "x", null, DataType.INTEGER));
+        cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            "y", null, DataType.INTEGER));
+        cschema.getField(0).uid = 1;
+        cschema.getField(1).uid = 2;
+        LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+        ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+        x.neverUseForRealSetUid(1);
+        LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+        ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+        y.neverUseForRealSetUid(2);
+        MultiMap<Integer, LogicalExpressionPlan> mm = 
+            new MultiMap<Integer, LogicalExpressionPlan>();
+        mm.put(0, aprojplan);
+        mm.put(1, bprojplan);
+        LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+        C.neverUseForRealSetSchema(cschema);
+        // Don't add it to the plan quite yet
+        
+        // D = filter
+        LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+        fy.neverUseForRealSetUid(2);
+        ConstantExpression fc = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+        new EqualExpression(filterPlan, fy, fc);
+        LOFilter D = new LOFilter(lp, filterPlan);
+        D.neverUseForRealSetSchema(cschema);
+        // Connect D to B, since the transform has happened.
+        lp.add(B, D, (LogicalRelationalOperator)null);
+        
+        // Now add in C, connected to A and D.
+        lp.add(new LogicalRelationalOperator[] {A, D}, C, null);
+        
+        changedPlan = new LogicalPlan();
+        changedPlan.add(D);
+        changedPlan.add(D, C, (LogicalRelationalOperator)null);
+    }
+    
+    private static class SillySameVisitor extends AllSameVisitor {
+        StringBuffer buf = new StringBuffer();
+
+        SillySameVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+        
+        @Override
+        protected void execute(LogicalRelationalOperator op) throws IOException {
+            buf.append(op.getName());
+            buf.append(" ");
+        }
+        
+        @Override
+        public String toString() {
+            return buf.toString();
+        }
+        
+    }
+    
+    // Test that the AllSameVisitor calls execute on every node
+    // in the plan.
+    @Test
+    public void testAllSameVisitor() throws IOException {
+        SillySameVisitor v = new SillySameVisitor(lp);
+        v.visit();
+        System.out.println(v.toString());
+        assertTrue("LOLoad LOJoin LOLoad LOFilter ".equals(v.toString()) ||
+            "LOLoad LOFilter LOJoin LOLoad ".equals(v.toString()));
+        
+    }
+    
+    private static class SillyExpressionVisitor extends LogicalExpressionVisitor {
+        StringBuffer buf;
+
+        protected SillyExpressionVisitor(OperatorPlan p, StringBuffer b) {
+            super(p, new DepthFirstWalker(p));
+            buf = b;
+        }
+        
+        @Override
+        public void visitAnd(AndExpression andExpr) throws IOException {
+            buf.append("and ");
+        }
+        
+        @Override
+        public void visitEqual(EqualExpression equal) throws IOException {
+            buf.append("equal ");
+        }
+        
+        @Override
+        public void visitProject(ProjectExpression p) throws IOException {
+            buf.append("proj ");
+        }
+        
+        @Override
+        public void visitConstant(ConstantExpression c) throws IOException {
+            buf.append("const ");
+        }
+    }
+    
+    private static class SillyAllExpressionVisitor extends AllExpressionVisitor {
+        StringBuffer buf = new StringBuffer();
+
+        public SillyAllExpressionVisitor(OperatorPlan plan) {
+            super(plan, new DepthFirstWalker(plan));
+        }
+     
+
+        @Override
+        protected LogicalExpressionVisitor getVisitor(LogicalExpressionPlan expr) {
+            return new SillyExpressionVisitor(expr, buf);
+        }   
+        
+        @Override
+        public String toString() {
+            return buf.toString();
+        }
+    }
+    
+    // Test that the AllExpressionVisitor executes on every
+    // expression in the plan
+    @Test
+    public void testAllExpressionVisitor() throws IOException {
+        SillyAllExpressionVisitor v = new SillyAllExpressionVisitor(lp);
+        v.visit();
+        assertTrue("proj proj equal proj const ".equals(v.toString()) ||
+            "equal proj const proj proj ".equals(v.toString()));
+    }
+    
+    // Test that schemas are patched up after a transform
+    @Test
+    public void testSchemaPatcher() throws IOException {
+        SchemaPatcher patcher = new SchemaPatcher();
+        patcher.transformed(lp, changedPlan);
+        
+        // Check that the filter now has the proper schema.
+        List<Operator> roots = changedPlan.getSources();
+        assertEquals(1, roots.size());
+        LOFilter D = (LOFilter)roots.get(0);
+        assertNotNull(D);
+        LogicalSchema dschema = D.getSchema();
+        assertEquals(1, dschema.size());
+        LogicalSchema.LogicalFieldSchema y = dschema.getField(0);
+        assertEquals("y", y.alias);
+        assertEquals(2, y.uid);
+    }
+    
+    // Test that projections are patched up after a transform
+    @Test
+    public void testProjectionPatcher() throws IOException {
+        ProjectionPatcher patcher = new ProjectionPatcher();
+        patcher.transformed(lp, changedPlan);
+        
+        // Check that the projections in filter are now set properly
+        List<Operator> roots = changedPlan.getSources();
+        assertEquals(1, roots.size());
+        LOFilter D = (LOFilter)roots.get(0);
+        assertNotNull(D);
+        LogicalExpressionPlan filterPlan = D.getFilterPlan();
+        List<Operator> leaves = filterPlan.getSinks();
+        assertEquals(2, leaves.size());
+        ProjectExpression proj = null;
+        for (Operator leaf : leaves) {
+            if (leaf instanceof ProjectExpression) {
+                proj = (ProjectExpression)leaf;
+                break;
+            }
+        }
+        assertNotNull(proj);
+        assertEquals(0, proj.getInputNum());
+        assertEquals(0, proj.getColNum());
+    }
+
+}
+

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,537 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+import java.util.List;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.EqualToExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.GreaterThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.LessThanExpr;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POCast;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.POProject;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POFilter;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POForEach;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POGlobalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPackage;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ConstantExpression;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.experimental.logical.expression.LogicalExpression;
+import org.apache.pig.experimental.logical.optimizer.UidStamper;
+import org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LogicalSchema.LogicalFieldSchema;
+import org.apache.pig.experimental.plan.Operator;
+import org.apache.pig.experimental.plan.OperatorPlan;
+import org.apache.pig.experimental.plan.PlanVisitor;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.test.utils.LogicalPlanTester;
+
+import junit.framework.TestCase;
+
+public class TestExperimentalLogToPhyTranslationVisitor extends TestCase {
+
+    private PhysicalPlan translatePlan(OperatorPlan plan) throws IOException {
+        LogToPhyTranslationVisitor visitor = new LogToPhyTranslationVisitor(plan);
+        visitor.visit();
+        return visitor.getPhysicalPlan();
+    }
+    
+    private org.apache.pig.experimental.logical.relational.LogicalPlan migratePlan(LogicalPlan lp) throws VisitorException{
+        LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(lp);        
+        visitor.visit();
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+        
+        try {
+            UidStamper stamper = new UidStamper(newPlan);
+            stamper.visit();
+            
+            return newPlan;
+        }catch(Exception e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    protected void setUp() throws Exception {    
+        LogicalExpression.resetNextUid();
+    }
+    
+    public void testSimplePlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt';");
+        lpt.buildPlan("b = filter a by $0==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals( 3, phyPlan.size() );
+        assertEquals( 1, phyPlan.getRoots().size() );
+        assertEquals( 1, phyPlan.getLeaves().size() );
+        
+        PhysicalOperator load = phyPlan.getRoots().get(0);
+        assertEquals( POLoad.class, load.getClass() );
+        assertTrue(  ((POLoad)load).getLFile().getFileName().contains("d.txt") );
+        
+        // Check for Filter
+        PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+        assertEquals( POFilter.class, fil.getClass() );
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        assertEquals( 2, filPlan.getRoots().size() );
+        assertEquals( 1, filPlan.getLeaves().size() );
+        
+        PhysicalOperator eq = filPlan.getLeaves().get(0);
+        assertEquals( EqualToExpr.class, eq.getClass() );
+        
+        PhysicalOperator prj1 = filPlan.getRoots().get(0);
+        assertEquals( POProject.class, prj1.getClass() );
+        assertEquals( 0, ((POProject)prj1).getColumn() );
+        PhysicalOperator constExp = filPlan.getRoots().get(1);
+        assertEquals( ConstantExpression.class, constExp.getClass() );
+        assertEquals( null, ((ConstantExpression)constExp).getValue() );
+        
+        // Check for Store
+        PhysicalOperator stor = phyPlan.getSuccessors(fil).get(0);
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue(  ((POStore)stor).getSFile().getFileName().contains("empty"));
+    }
+    
+    public void testJoinPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+        lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+        lpt.buildPlan("c = join a by id, b by c;");
+        lpt.buildPlan("d = filter c by a::id==NULL AND b::c==NULL;");        
+        LogicalPlan plan = lpt.buildPlan("store d into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migratePlan(plan);
+        PhysicalPlan physicalPlan = translatePlan(newPlan);
+        assertEquals(9, physicalPlan.size());
+        assertEquals(physicalPlan.getRoots().size(), 2);
+        
+        // Check Load and LocalRearrange and GlobalRearrange
+        PhysicalOperator LoR = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(0)).get(0);
+        assertEquals( POLocalRearrange.class, LoR.getClass() );
+        POLocalRearrange Lor = (POLocalRearrange) LoR;
+        PhysicalOperator prj3 = Lor.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj3.getClass() );
+        assertEquals(0, ((POProject)prj3).getColumn() );
+        PhysicalOperator inp1 = Lor.getInputs().get(0);
+        assertEquals( POLoad.class, inp1.getClass() );
+        assertTrue(  ((POLoad)inp1).getLFile().getFileName().contains("d1.txt") );
+                
+        PhysicalOperator LoR1 = (PhysicalOperator)physicalPlan.getSuccessors(physicalPlan.getRoots().get(1)).get(0);
+        assertEquals( POLocalRearrange.class, LoR1.getClass() );
+        POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
+        PhysicalOperator prj4 = Lor1.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj4.getClass() );
+        assertEquals(1, ((POProject)prj4).getColumn() );
+        PhysicalOperator inp2 = Lor1.getInputs().get(0);
+        assertEquals( POLoad.class, inp2.getClass() );
+        assertTrue(  ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") );
+        
+        PhysicalOperator GoR = (PhysicalOperator)physicalPlan.getSuccessors(LoR).get(0);
+        assertEquals( POGlobalRearrange.class, GoR.getClass() );
+        
+        PhysicalOperator Pack = (PhysicalOperator)physicalPlan.getSuccessors(GoR).get(0);
+        assertEquals( POPackage.class, Pack.getClass() );
+
+        // Check for ForEach
+        PhysicalOperator ForE = (PhysicalOperator)physicalPlan.getSuccessors(Pack).get(0);
+        assertEquals( POForEach.class, ForE.getClass() );
+        PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj5.getClass() );
+        assertEquals( 1, ((POProject)prj5).getColumn() ); 
+        PhysicalOperator prj6 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
+        assertEquals( POProject.class, prj6.getClass() );
+        assertEquals( 2, ((POProject)prj6).getColumn() );
+        
+        // Filter Operator
+        PhysicalOperator fil = (PhysicalOperator)physicalPlan.getSuccessors(ForE).get(0);
+        assertEquals( POFilter.class, fil.getClass() );        
+        
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        List<PhysicalOperator> filRoots = filPlan.getRoots();
+        
+        assertEquals( ConstantExpression.class, filRoots.get(1).getClass() );
+        ConstantExpression ce1 = (ConstantExpression) filRoots.get(1);
+        assertEquals( null, ce1.getValue() ); 
+        assertEquals( ConstantExpression.class, filRoots.get(3).getClass() );
+        ConstantExpression ce2 = (ConstantExpression) filRoots.get(3);
+        assertEquals( null, ce2.getValue() );
+        assertEquals( POProject.class, filRoots.get(0).getClass() );
+        POProject prj1 = (POProject) filRoots.get(0);
+        assertEquals( 3, prj1.getColumn() );
+        assertEquals( POProject.class, filRoots.get(2).getClass() );
+        POProject prj2 = (POProject) filRoots.get(2);
+        assertEquals( 0, prj2.getColumn() );
+
+
+        // Check Store Operator
+        PhysicalOperator stor = (PhysicalOperator)physicalPlan.getSuccessors(fil).get(0);
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue(  ((POStore)stor).getSFile().getFileName().contains("empty") );
+    }
+    
+    public void testMultiStore() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd1.txt' as (id, c);");
+        lpt.buildPlan("b = load 'd2.txt'as (id, c);");
+        lpt.buildPlan("c = load 'd3.txt' as (id, c);");
+        lpt.buildPlan("d = join a by id, b by c;");        
+        lpt.buildPlan("e = filter d by a::id==NULL AND b::c==NULL;");
+        lpt.buildPlan("f = join e by b::c, c by id;");
+        lpt.buildPlan("g = filter f by b::id==NULL AND c::c==NULL;");
+        LogicalPlan plan = lpt.buildPlan("store g into 'empty2';");        
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        assertEquals(16, phyPlan.size());
+        assertEquals(phyPlan.getRoots().size(), 3);
+        assertEquals(phyPlan.getLeaves().size(), 1 );
+
+        // Check Load and LocalRearrange and GlobalRearrange
+        PhysicalOperator LoR = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+        assertEquals( POLocalRearrange.class, LoR.getClass() );
+        POLocalRearrange Lor = (POLocalRearrange) LoR;
+        PhysicalOperator prj1 = Lor.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj1.getClass() );
+        assertEquals(0, ((POProject)prj1).getColumn() );
+        PhysicalOperator inp1 = Lor.getInputs().get(0);
+        assertEquals( POLoad.class, inp1.getClass() );
+        assertTrue(  ((POLoad)inp1).getLFile().getFileName().contains("d3.txt") );
+
+        PhysicalOperator LoR1 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(1)).get(0);
+        assertEquals( POLocalRearrange.class, LoR1.getClass() );
+        POLocalRearrange Lor1 = (POLocalRearrange) LoR1;
+        PhysicalOperator prj2 = Lor1.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj2.getClass() );
+        assertEquals(1, ((POProject)prj2).getColumn() );
+        PhysicalOperator inp2 = Lor1.getInputs().get(0);
+        assertEquals( POLoad.class, inp2.getClass() );
+        assertTrue(  ((POLoad)inp2).getLFile().getFileName().contains("d2.txt") );
+        
+        PhysicalOperator GoR = (PhysicalOperator)phyPlan.getSuccessors(LoR).get(0);
+        assertEquals( POGlobalRearrange.class, GoR.getClass() );
+        
+        PhysicalOperator Pack = (PhysicalOperator)phyPlan.getSuccessors(GoR).get(0);
+        assertEquals( POPackage.class, Pack.getClass() );
+        
+        PhysicalOperator LoR2 = (PhysicalOperator)phyPlan.getSuccessors(phyPlan.getRoots().get(2)).get(0);
+        assertEquals( POLocalRearrange.class, LoR2.getClass() );
+        POLocalRearrange Lor2 = (POLocalRearrange) LoR2;
+        PhysicalOperator prj3 = Lor2.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj3.getClass() );
+        assertEquals(0, ((POProject)prj3).getColumn() );
+        PhysicalOperator inp3 = Lor2.getInputs().get(0);
+        assertEquals( POLoad.class, inp3.getClass() );
+        assertTrue(  ((POLoad)inp3).getLFile().getFileName().contains("d1.txt") );
+        
+        PhysicalOperator GoR2 = (PhysicalOperator)phyPlan.getSuccessors(LoR2).get(0);
+        assertEquals( POGlobalRearrange.class, GoR2.getClass() );
+        
+        PhysicalOperator Pack2 = (PhysicalOperator)phyPlan.getSuccessors(GoR2).get(0);
+        assertEquals( POPackage.class, Pack2.getClass() );
+        
+        // Check for ForEach
+        PhysicalOperator ForE = (PhysicalOperator)phyPlan.getSuccessors(Pack).get(0);
+        assertEquals( POForEach.class, ForE.getClass() );
+        PhysicalOperator prj4 = ((POForEach)ForE).getInputPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj4.getClass() );
+        assertEquals( 1, ((POProject)prj4).getColumn() ); 
+        PhysicalOperator prj5 = ((POForEach)ForE).getInputPlans().get(1).getLeaves().get(0);
+        assertEquals( POProject.class, prj5.getClass() );
+        assertEquals( 2, ((POProject)prj5).getColumn() );
+        
+        PhysicalOperator ForE2 = (PhysicalOperator)phyPlan.getSuccessors(Pack2).get(0);
+        assertEquals( POForEach.class, ForE2.getClass() );
+        PhysicalOperator prj6 = ((POForEach)ForE2).getInputPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj6.getClass() );
+        assertEquals( 1, ((POProject)prj6).getColumn() ); 
+        PhysicalOperator prj7 = ((POForEach)ForE2).getInputPlans().get(1).getLeaves().get(0);
+        assertEquals( POProject.class, prj7.getClass() );
+        assertEquals( 2, ((POProject)prj7).getColumn() );
+        
+        // Check Filter Operator
+        PhysicalOperator fil = (PhysicalOperator)phyPlan.getSuccessors(ForE).get(0);
+        assertEquals( POFilter.class, fil.getClass() );
+        
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        List<PhysicalOperator> filRoots = filPlan.getRoots();
+        
+        assertEquals( ConstantExpression.class, filRoots.get(0).getClass() );
+        ConstantExpression ce1 = (ConstantExpression) filRoots.get(0);
+        assertEquals( null, ce1.getValue() ); 
+        assertEquals( ConstantExpression.class, filRoots.get(2).getClass() );
+        ConstantExpression ce2 = (ConstantExpression) filRoots.get(2);
+        assertEquals( null, ce2.getValue() );
+        assertEquals( POProject.class, filRoots.get(1).getClass() );
+        POProject prj8 = (POProject) filRoots.get(1);
+        assertEquals( 5, prj8.getColumn() );
+        assertEquals( POProject.class, filRoots.get(3).getClass() );
+        POProject prj9 = (POProject) filRoots.get(3);
+        assertEquals( 2, prj9.getColumn() );
+        
+        
+        PhysicalOperator fil2 = (PhysicalOperator)phyPlan.getSuccessors(ForE2).get(0);
+        assertEquals( POFilter.class, fil2.getClass() );
+        
+        PhysicalOperator LoR3 = (PhysicalOperator)phyPlan.getSuccessors(fil2).get(0);
+        assertEquals( POLocalRearrange.class, LoR3.getClass() );
+        POLocalRearrange Lor3 = (POLocalRearrange) LoR3;
+        PhysicalOperator prj12 = Lor3.getPlans().get(0).getLeaves().get(0);
+        assertEquals( POProject.class, prj12.getClass() );
+        assertEquals(3, ((POProject)prj12).getColumn() );
+        
+        PhysicalPlan filPlan2 = ((POFilter)fil2).getPlan();
+        List<PhysicalOperator> filRoots2 = filPlan2.getRoots();
+        
+        assertEquals( ConstantExpression.class, filRoots2.get(0).getClass() );
+        ConstantExpression ce3 = (ConstantExpression) filRoots2.get(0);
+        assertEquals( null, ce3.getValue() ); 
+        assertEquals( ConstantExpression.class, filRoots2.get(2).getClass() );
+        ConstantExpression ce4 = (ConstantExpression) filRoots2.get(2);
+        assertEquals( null, ce4.getValue() );
+        assertEquals( POProject.class, filRoots2.get(1).getClass() );
+        POProject prj10 = (POProject) filRoots2.get(1);
+        assertEquals( 3, prj10.getColumn() );
+        assertEquals( POProject.class, filRoots2.get(3).getClass() );
+        POProject prj11 = (POProject) filRoots2.get(3);
+        assertEquals( 0, prj11.getColumn() );
+        
+        // Check Store Operator
+        PhysicalOperator stor = (PhysicalOperator)phyPlan.getLeaves().get(0);
+        assertEquals( stor, phyPlan.getSuccessors(fil).get(0));
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue(  ((POStore)stor).getSFile().getFileName().contains("empty") );
+    }
+    
+    public void testPlanWithCast() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = filter a by (int)id==10;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        assertEquals(3, phyPlan.size());
+        assertEquals(phyPlan.getRoots().size(), 1);
+        assertEquals(phyPlan.getLeaves().size(), 1 );
+        
+        PhysicalOperator load = phyPlan.getRoots().get(0);
+        assertEquals( POLoad.class, load.getClass() );
+        assertTrue(  ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+        
+        PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+        assertEquals( POFilter.class, fil.getClass() );
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        
+        PhysicalOperator equal = filPlan.getLeaves().get(0);
+        assertEquals( EqualToExpr.class, equal.getClass() );
+        assertEquals( DataType.BOOLEAN, ((EqualToExpr)equal).getResultType() );
+        
+        PhysicalOperator constExpr = ((EqualToExpr)equal).getRhs();
+        assertEquals( ConstantExpression.class, constExpr.getClass() );
+        assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+        
+        PhysicalOperator castExpr = ((EqualToExpr)equal).getLhs();
+        assertEquals( POCast.class, castExpr.getClass() );
+        assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+        
+        PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+        assertEquals( POProject.class, prj.getClass() );
+        assertEquals( 0, ((POProject)prj).getColumn() );
+        
+        PhysicalOperator stor = phyPlan.getLeaves().get(0);
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );        
+    }
+    
+    public void testPlanWithGreaterThan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = filter a by (int)id>10;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        assertEquals(3, phyPlan.size());
+        assertEquals(phyPlan.getRoots().size(), 1);
+        assertEquals(phyPlan.getLeaves().size(), 1 );
+        
+        PhysicalOperator load = phyPlan.getRoots().get(0);
+        assertEquals( POLoad.class, load.getClass() );
+        assertTrue(  ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+        
+        PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+        assertEquals( POFilter.class, fil.getClass() );
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        
+        PhysicalOperator greaterThan = filPlan.getLeaves().get(0);
+        assertEquals( GreaterThanExpr.class, greaterThan.getClass() );
+        assertEquals( DataType.BOOLEAN, ((GreaterThanExpr)greaterThan).getResultType() );
+        
+        PhysicalOperator constExpr = ((GreaterThanExpr)greaterThan).getRhs();
+        assertEquals( ConstantExpression.class, constExpr.getClass() );
+        assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+        
+        PhysicalOperator castExpr = ((GreaterThanExpr)greaterThan).getLhs();
+        assertEquals( POCast.class, castExpr.getClass() );
+        assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+        
+        PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+        assertEquals( POProject.class, prj.getClass() );
+        assertEquals( 0, ((POProject)prj).getColumn() );
+        
+        PhysicalOperator stor = phyPlan.getLeaves().get(0);
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );  
+    }
+    
+    public void testPlanWithLessThan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = filter a by (int)id<10;");
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");
+        
+        // check basics
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        assertEquals(3, phyPlan.size());
+        assertEquals(phyPlan.getRoots().size(), 1);
+        assertEquals(phyPlan.getLeaves().size(), 1 );
+        
+        PhysicalOperator load = phyPlan.getRoots().get(0);
+        assertEquals( POLoad.class, load.getClass() );
+        assertTrue(  ((POLoad)load).getLFile().getFileName().contains("d.txt"));
+        
+        PhysicalOperator fil = phyPlan.getSuccessors(load).get(0);
+        assertEquals( POFilter.class, fil.getClass() );
+        PhysicalPlan filPlan = ((POFilter)fil).getPlan();
+        
+        PhysicalOperator lessThan = filPlan.getLeaves().get(0);
+        assertEquals( LessThanExpr.class, lessThan.getClass() );
+        assertEquals( DataType.BOOLEAN, ((LessThanExpr)lessThan).getResultType() );
+        
+        PhysicalOperator constExpr = ((LessThanExpr)lessThan).getRhs();
+        assertEquals( ConstantExpression.class, constExpr.getClass() );
+        assertEquals( 10, ((ConstantExpression)constExpr).getValue() );
+        
+        PhysicalOperator castExpr = ((LessThanExpr)lessThan).getLhs();
+        assertEquals( POCast.class, castExpr.getClass() );
+        assertEquals( DataType.INTEGER, ((POCast)castExpr).getResultType() );
+        
+        PhysicalOperator prj = ((POCast)castExpr).getInputs().get(0);
+        assertEquals( POProject.class, prj.getClass() );
+        assertEquals( 0, ((POProject)prj).getColumn() );
+        
+        PhysicalOperator stor = phyPlan.getLeaves().get(0);
+        assertEquals( POStore.class, stor.getClass() );
+        assertTrue( ((POStore)stor).getSFile().getFileName().contains( "empty" ) );  
+    }
+
+    public void testForeachPlan() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c);");
+        lpt.buildPlan("b = foreach a generate id, c;");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals(phyPlan.size(), 3);
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);        
+        assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
+        POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+        
+        assertEquals(foreach.getInputPlans().size(), 2);
+        
+        PhysicalPlan inner = foreach.getInputPlans().get(0);
+        assertEquals(inner.size(), 1);
+        POProject prj = (POProject)inner.getRoots().get(0);
+        assertEquals(prj.getColumn(), 0);
+        assertEquals(prj.getInputs().get(0), load);
+        
+        inner = foreach.getInputPlans().get(1);
+        assertEquals(inner.size(), 1);
+        prj = (POProject)inner.getRoots().get(0);
+        assertEquals(prj.getColumn(), 1);
+        assertEquals(prj.getInputs().get(0), load);
+        Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
+        assertFalse(flat[0]);
+        assertFalse(flat[1]);
+    }
+    
+    public void testForeachPlan2() throws Exception {
+        LogicalPlanTester lpt = new LogicalPlanTester();
+        lpt.buildPlan("a = load 'd.txt' as (id, c:bag{t:(s,v)});");
+        lpt.buildPlan("b = foreach a generate id, flatten(c);");        
+        LogicalPlan plan = lpt.buildPlan("store b into 'empty';");  
+        
+        org.apache.pig.experimental.logical.relational.LogicalPlan newLogicalPlan = migratePlan(plan);
+        LogicalRelationalOperator ld =  (LogicalRelationalOperator)newLogicalPlan.getSources().get(0);
+        LogicalRelationalOperator fe = (LogicalRelationalOperator)newLogicalPlan.getSuccessors(ld).get(0);
+        LogicalSchema ls = fe.getSchema();
+        assertEquals(1, ls.getField(0).uid);
+        assertEquals(4, ls.getField(1).uid);
+        assertEquals(5, ls.getField(2).uid);
+        
+        LogicalSchema expected = new LogicalSchema();
+        expected.addField(new LogicalFieldSchema("id", null, DataType.BYTEARRAY));
+        expected.addField(new LogicalFieldSchema("s", null, DataType.BYTEARRAY));
+        expected.addField(new LogicalFieldSchema("v", null, DataType.BYTEARRAY));
+        assertTrue(expected.isEqual(ls));
+        
+        
+        PhysicalPlan phyPlan = translatePlan(newLogicalPlan);
+        
+        assertEquals(phyPlan.size(), 3);
+        POLoad load = (POLoad)phyPlan.getRoots().get(0);        
+        assertEquals(phyPlan.getLeaves().get(0).getClass(), POStore.class);
+        POForEach foreach = (POForEach)phyPlan.getSuccessors(phyPlan.getRoots().get(0)).get(0);
+        
+        assertEquals(foreach.getInputPlans().size(), 2);
+        
+        PhysicalPlan inner = foreach.getInputPlans().get(0);
+        assertEquals(inner.size(), 1);
+        POProject prj = (POProject)inner.getRoots().get(0);
+        assertEquals(prj.getColumn(), 0);
+        assertEquals(prj.getInputs().get(0), load);
+        
+        inner = foreach.getInputPlans().get(1);
+        assertEquals(inner.size(), 1);
+        prj = (POProject)inner.getRoots().get(0);
+        assertEquals(prj.getColumn(), 1);
+        assertEquals(prj.getInputs().get(0), load);
+        Boolean[] flat = foreach.getToBeFlattened().toArray(new Boolean[0]);
+        assertFalse(flat[0]);
+        assertTrue(flat[1]);
+    }
+    
+}

Added: hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java?rev=909165&view=auto
==============================================================================
--- hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java (added)
+++ hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java Thu Feb 11 22:12:36 2010
@@ -0,0 +1,239 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.test;
+
+import java.io.IOException;
+
+import org.apache.pig.FuncSpec;
+import org.apache.pig.data.DataType;
+import org.apache.pig.experimental.logical.expression.AndExpression;
+import org.apache.pig.experimental.logical.expression.ConstantExpression;
+import org.apache.pig.experimental.logical.expression.EqualExpression;
+import org.apache.pig.experimental.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.experimental.logical.expression.ProjectExpression;
+import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.experimental.logical.relational.LOFilter;
+import org.apache.pig.experimental.logical.relational.LOJoin;
+import org.apache.pig.experimental.logical.relational.LOLoad;
+import org.apache.pig.experimental.logical.relational.LogicalPlan;
+import org.apache.pig.experimental.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.experimental.logical.relational.LogicalSchema;
+import org.apache.pig.experimental.logical.relational.LOJoin.JOINTYPE;
+import org.apache.pig.impl.io.FileSpec;
+import org.apache.pig.impl.util.MultiMap;
+import org.junit.Test;
+
+import junit.framework.TestCase;
+
+/**
+ * Test end to end logical optimizations.
+ */
+public class TestExperimentalLogicalOptimizer extends TestCase {
+    
+    @Test
+    public void testFilterPushDown() throws IOException {
+        // A logical plan for:
+        // A = load 'bla' as (x, y);
+        // B = load 'morebla' as (a, b);
+        // C = join A on x, B on a;
+        // D = filter C by x = a and x = 0 and b = 1 and y = b;
+        // store D into 'whatever';
+        
+        // A = load
+        LogicalPlan lp = new LogicalPlan();
+        {
+        	LogicalSchema aschema = new LogicalSchema();
+        	aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"x", null, DataType.INTEGER));
+        	aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"y", null, DataType.INTEGER));
+        	aschema.getField(0).uid = 1;
+        	aschema.getField(1).uid = 2;
+        	LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, lp);
+        	lp.add(A);
+	        
+        	// B = load
+        	LogicalSchema bschema = new LogicalSchema();
+        	bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"a", null, DataType.INTEGER));
+        	bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"b", null, DataType.INTEGER));
+        	bschema.getField(0).uid = 3;
+        	bschema.getField(1).uid = 4;
+        	LOLoad B = new LOLoad(null, bschema, lp);
+        	lp.add(B);
+	        
+        	// C = join
+        	LogicalSchema cschema = new LogicalSchema();
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"x", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"y", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"a", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"b", null, DataType.INTEGER));
+        	cschema.getField(0).uid = 1;
+        	cschema.getField(1).uid = 2;
+        	cschema.getField(2).uid = 3;
+        	cschema.getField(3).uid = 4;
+        	LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+        	ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+        	x.neverUseForRealSetUid(1);
+        	LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+        	ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+        	y.neverUseForRealSetUid(3);
+        	MultiMap<Integer, LogicalExpressionPlan> mm = 
+            	new MultiMap<Integer, LogicalExpressionPlan>();
+        	mm.put(0, aprojplan);
+        	mm.put(1, bprojplan);
+        	LOJoin C = new LOJoin(lp, mm, JOINTYPE.HASH, new boolean[] {true, true});
+        	C.neverUseForRealSetSchema(cschema);
+        	lp.add(new LogicalRelationalOperator[] {A, B}, C, null);
+        
+        	// D = filter
+        	LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        	ProjectExpression fx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+        	fx.neverUseForRealSetUid(1);
+        	ConstantExpression fc0 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(0));
+        	EqualExpression eq1 = new EqualExpression(filterPlan, fx, fc0);
+        	ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+        	fanotherx.neverUseForRealSetUid(1);
+        	ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2);
+        	fa.neverUseForRealSetUid(3);
+        	EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
+        	AndExpression and1 = new AndExpression(filterPlan, eq1, eq2);
+        	ProjectExpression fb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+        	fb.neverUseForRealSetUid(4);
+        	ConstantExpression fc1 = new ConstantExpression(filterPlan, DataType.INTEGER, new Integer(1));
+        	EqualExpression eq3 = new EqualExpression(filterPlan, fb, fc1);
+        	AndExpression and2 = new AndExpression(filterPlan, and1, eq3);
+        	ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+        	fanotherb.neverUseForRealSetUid(4);
+        	ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+        	fy.neverUseForRealSetUid(2);
+        	EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
+        	new AndExpression(filterPlan, and2, eq4);
+        
+        	LOFilter D = new LOFilter(lp, filterPlan);
+        	D.neverUseForRealSetSchema(cschema);
+        	// Connect D to B, since the transform has happened.
+            lp.add(C, D, (LogicalRelationalOperator)null);
+        }
+        
+        LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(lp, 500);
+        optimizer.optimize();
+        
+        LogicalPlan expected = new LogicalPlan();
+        {
+            // A = load
+        	LogicalSchema aschema = new LogicalSchema();
+        	aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"x", null, DataType.INTEGER));
+        	aschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"y", null, DataType.INTEGER));
+        	aschema.getField(0).uid = 1;
+        	aschema.getField(1).uid = 2;
+        	LOLoad A = new LOLoad(new FileSpec("bla", new FuncSpec("PigStorage", "\t")), aschema, expected);
+        	expected.add(A);
+        	
+        	// DA = filter
+        	LogicalExpressionPlan DAfilterPlan = new LogicalExpressionPlan();
+        	ProjectExpression fx = new ProjectExpression(DAfilterPlan, DataType.INTEGER, 0, 0);
+        	fx.neverUseForRealSetUid(1);
+        	ConstantExpression fc0 = new ConstantExpression(DAfilterPlan, DataType.INTEGER, new Integer(0));
+        	new EqualExpression(DAfilterPlan, fx, fc0);
+	        
+        	LOFilter DA = new LOFilter(expected, DAfilterPlan);
+        	DA.neverUseForRealSetSchema(aschema);
+        	expected.add(A, DA, (LogicalRelationalOperator)null);
+	        
+        	// B = load
+        	LogicalSchema bschema = new LogicalSchema();
+        	bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"a", null, DataType.INTEGER));
+        	bschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"b", null, DataType.INTEGER));
+        	bschema.getField(0).uid = 3;
+        	bschema.getField(1).uid = 4;
+        	LOLoad B = new LOLoad(null, bschema, expected);
+        	expected.add(B);
+        	
+        	// DB = filter
+        	LogicalExpressionPlan DBfilterPlan = new LogicalExpressionPlan();
+        	ProjectExpression fb = new ProjectExpression(DBfilterPlan, DataType.INTEGER, 0, 1);
+        	fb.neverUseForRealSetUid(4);
+        	ConstantExpression fc1 = new ConstantExpression(DBfilterPlan, DataType.INTEGER, new Integer(1));
+        	new EqualExpression(DBfilterPlan, fb, fc1);
+	        
+        	LOFilter DB = new LOFilter(expected, DBfilterPlan);
+        	DB.neverUseForRealSetSchema(bschema);
+        	expected.add(B, DB, (LogicalRelationalOperator)null);
+	        
+        	// C = join
+        	LogicalSchema cschema = new LogicalSchema();
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"x", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"y", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"a", null, DataType.INTEGER));
+        	cschema.addField(new LogicalSchema.LogicalFieldSchema(
+            	"b", null, DataType.INTEGER));
+        	cschema.getField(0).uid = 1;
+        	cschema.getField(1).uid = 2;
+        	cschema.getField(2).uid = 3;
+        	cschema.getField(3).uid = 4;
+        	LogicalExpressionPlan aprojplan = new LogicalExpressionPlan();
+        	ProjectExpression x = new ProjectExpression(aprojplan, DataType.INTEGER, 0, 0);
+        	x.neverUseForRealSetUid(1);
+        	LogicalExpressionPlan bprojplan = new LogicalExpressionPlan();
+        	ProjectExpression y = new ProjectExpression(bprojplan, DataType.INTEGER, 1, 0);
+        	y.neverUseForRealSetUid(3);
+        	MultiMap<Integer, LogicalExpressionPlan> mm = 
+            	new MultiMap<Integer, LogicalExpressionPlan>();
+        	mm.put(0, aprojplan);
+        	mm.put(1, bprojplan);
+        	LOJoin C = new LOJoin(expected, mm, JOINTYPE.HASH, new boolean[] {true, true});
+        	C.neverUseForRealSetSchema(cschema);
+        	expected.add(new LogicalRelationalOperator[] {DA, DB}, C, null);
+	        
+        	// D = filter
+        	LogicalExpressionPlan filterPlan = new LogicalExpressionPlan();
+        	ProjectExpression fanotherx = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 0);
+        	fanotherx.neverUseForRealSetUid(1);
+        	ProjectExpression fa = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 2);
+        	fa.neverUseForRealSetUid(3);
+        	EqualExpression eq2 = new EqualExpression(filterPlan, fanotherx, fa);
+        	ProjectExpression fanotherb = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 3);
+        	fanotherb.neverUseForRealSetUid(4);
+        	ProjectExpression fy = new ProjectExpression(filterPlan, DataType.INTEGER, 0, 1);
+        	fy.neverUseForRealSetUid(2);
+        	EqualExpression eq4 = new EqualExpression(filterPlan, fy, fanotherb);
+        	new AndExpression(filterPlan, eq2, eq4);
+	        
+        	LOFilter D = new LOFilter(expected, filterPlan);
+        	D.neverUseForRealSetSchema(cschema);
+        	expected.add(C, D, (LogicalRelationalOperator)null);
+        }
+        
+        assertTrue( lp.isEqual(expected) );
+        // assertEquals(lp, expected);
+    }
+
+}