You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by da...@apache.org on 2010/08/04 19:46:48 UTC

svn commit: r982345 [1/13] - in /hadoop/pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/experimental/ src/org/apache/pig/newplan/ src/org/apache/pig/newplan/logical/ src/org/apache/pig/newplan/log...

Author: daijy
Date: Wed Aug  4 17:46:42 2010
New Revision: 982345

URL: http://svn.apache.org/viewvc?rev=982345&view=rev
Log:
PIG-1178: LogicalPlan and Optimizer are too complex and hard to work with (PIG-1178-5.patch)

Added:
    hadoop/pig/trunk/src/org/apache/pig/newplan/
    hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalExpPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/Util.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AddExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AllSameExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/AndExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BagDereferenceExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinCondExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/BinaryExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/CastExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ColumnExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ConstantExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DereferenceExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/DivideExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/EqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ExpToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/GreaterThanEqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/GreaterThanExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/IsNullExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LessThanEqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LessThanExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/LogicalExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/MapLookupExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ModExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/MultiplyExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NegativeExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NotEqualExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/NotExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/OrExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/ProjectExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/RegexExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/SubtractExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UnaryExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/expression/UserFuncExpression.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllExpressionVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameRalationalNodesVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/AllSameVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ExprPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/LogicalPlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/PlanPrinter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/ProjectionPatcher.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaPatcher.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/SchemaResetter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/optimizer/UidStamper.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOForEach.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOGenerate.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOInnerLoad.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplit.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSplitOutput.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStore.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOStream.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlanVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalNodesVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalRelationalOperator.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/relational/SchemaNotDefinedException.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/AddForEach.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnMapKeyPrune.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneHelper.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/ColumnPruneVisitor.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/FilterAboveForeach.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MapKeysPruneHelper.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/PushUpFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/SplitFilter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/logical/rules/WholePlanRule.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanOptimizer.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/PlanTransformListener.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Rule.java
    hadoop/pig/trunk/src/org/apache/pig/newplan/optimizer/Transformer.java
    hadoop/pig/trunk/test/newlogicalplan-tests
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanColumnPrune2.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterAboveForeach.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanFilterRule.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanListener.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanLogicalOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanPruneMapKeys.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestNewPlanRule.java
Removed:
    hadoop/pig/trunk/src/org/apache/pig/experimental/
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalColumnPrune.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterAboveForeach.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalFilterRule.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalListener.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogToPhyTranslationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalLogicalOptimizer.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalOperatorPlan.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalPruneMapKeys.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestExperimentalRule.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPruneColumnNewLogicalPlan.java
Modified:
    hadoop/pig/trunk/build.xml
    hadoop/pig/trunk/src/org/apache/pig/PigServer.java
    hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/JobStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/PigStats.java
    hadoop/pig/trunk/src/org/apache/pig/tools/pigstats/ScriptState.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestLogicalPlanMigrationVisitor.java
    hadoop/pig/trunk/test/org/apache/pig/test/TestPigRunner.java
    hadoop/pig/trunk/test/org/apache/pig/test/Util.java

Modified: hadoop/pig/trunk/build.xml
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/build.xml?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/build.xml (original)
+++ hadoop/pig/trunk/build.xml Wed Aug  4 17:46:42 2010
@@ -80,6 +80,7 @@
     <property name="test.timeout" value="2700000" />
     <property name="test.junit.output.format" value="plain" />
     <property name="test.commit.file" value="${test.src.dir}/commit-tests"/>
+    <property name="test.newlogicalplan.file" value="${test.src.dir}/newlogicalplan-tests"/>
     <property name="test.unit.file" value="${test.src.dir}/unit-tests"/>
     <property name="test.smoke.file" value="${test.src.dir}/smoke-tests"/>
     <property name="test.all.file" value="${test.src.dir}/all-tests"/>
@@ -546,6 +547,10 @@
         <macro-test-runner test.file="${test.commit.file}" />
     </target>
 
+    <target name="test-newlogicalplan" depends="compile-test,jar-withouthadoop" description="Run approximate 10-minute set of unit tests prior to commiting">
+        <macro-test-runner test.file="${test.newlogicalplan.file}" />
+    </target>
+
     <target name="test-unit" depends="compile-test,jar-withouthadoop" description="Run all true unit tests">
         <macro-test-runner test.file="${test.unit.file}" />
     </target>

Modified: hadoop/pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/PigServer.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/PigServer.java Wed Aug  4 17:46:42 2010
@@ -59,9 +59,6 @@ import org.apache.pig.classification.Int
 import org.apache.pig.classification.InterfaceStability;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer;
-import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.InterStorage;
@@ -95,6 +92,8 @@ import org.apache.pig.impl.streaming.Str
 import org.apache.pig.impl.util.LogUtils;
 import org.apache.pig.impl.util.ObjectSerializer;
 import org.apache.pig.impl.util.PropertiesUtil;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
 import org.apache.pig.pen.ExampleGenerator;
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
@@ -881,12 +880,8 @@ public class PigServer {
             if( pigContext.getProperties().getProperty("pig.usenewlogicalplan", "false").equals("true") ) {
                 LogicalPlanMigrationVistor migrator = new LogicalPlanMigrationVistor(lp);
                 migrator.visit();
-                org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
+                org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = migrator.getNewLogicalPlan();
                 
-                // set uids
-                UidStamper stamper = new UidStamper(newPlan);
-                stamper.visit();
-
                 LogicalPlanOptimizer optimizer = new LogicalPlanOptimizer(newPlan, 3);
                 optimizer.optimize();                
                 

Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=982345&r1=982344&r2=982345&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Wed Aug  4 17:46:42 2010
@@ -50,8 +50,6 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStore;
 import org.apache.pig.backend.hadoop.executionengine.util.MapRedUtil;
-import org.apache.pig.experimental.logical.LogicalPlanMigrationVistor;
-import org.apache.pig.experimental.logical.optimizer.UidStamper;
 import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.io.FileSpec;
@@ -59,6 +57,8 @@ import org.apache.pig.impl.io.InterStora
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
+import org.apache.pig.newplan.logical.optimizer.SchemaResetter;
 import org.apache.pig.tools.pigstats.OutputStats;
 import org.apache.pig.tools.pigstats.PigStats;
 
@@ -235,20 +235,19 @@ public class HExecutionEngine {
                 // translate old logical plan to new plan
                 LogicalPlanMigrationVistor visitor = new LogicalPlanMigrationVistor(plan);
                 visitor.visit();
-                org.apache.pig.experimental.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+                org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
                 
-                // set uids
-                UidStamper stamper = new UidStamper(newPlan);
-                stamper.visit();
+                SchemaResetter schemaResetter = new SchemaResetter(newPlan);
+                schemaResetter.visit();
                 
                 // run optimizer
-                org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer optimizer = 
-                    new org.apache.pig.experimental.logical.optimizer.LogicalPlanOptimizer(newPlan, 100);
+                org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer optimizer = 
+                    new org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer(newPlan, 100);
                 optimizer.optimize();
                 
                 // translate new logical plan to physical plan
-                org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor translator = 
-                    new org.apache.pig.experimental.logical.relational.LogToPhyTranslationVisitor(newPlan);
+                org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor translator = 
+                    new org.apache.pig.newplan.logical.relational.LogToPhyTranslationVisitor(newPlan);
                 
                 translator.setPigContext(pigContext);
                 translator.visit();

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,277 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.pig.impl.util.Pair;
+
+public abstract class BaseOperatorPlan implements OperatorPlan {
+
+    protected Set<Operator> ops;
+    protected PlanEdge fromEdges;
+    protected PlanEdge toEdges;
+
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    protected static final Log log =
+        LogFactory.getLog(BaseOperatorPlan.class);
+ 
+    public BaseOperatorPlan() {
+        ops = new HashSet<Operator>();
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        fromEdges = new PlanEdge();
+        toEdges = new PlanEdge();
+    }
+    
+    /**
+     * Get number of nodes in the plan.
+     */
+    public int size() {
+        return ops.size();
+    }
+
+    /**
+     * Get all operators in the plan that have no predecessors.
+     * @return all operators in the plan that have no predecessors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getSources() {
+        if (roots.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {               
+                if (toEdges.get(op) == null) {
+                    roots.add(op);
+                }
+            }
+        }
+        return roots;
+    }
+
+    /**
+     * Get all operators in the plan that have no successors.
+     * @return all operators in the plan that have no successors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getSinks() {
+        if (leaves.size() == 0 && ops.size() > 0) {
+            for (Operator op : ops) {
+                if (fromEdges.get(op) == null) {
+                    leaves.add(op);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    /**
+     * For a given operator, get all operators immediately before it in the
+     * plan.
+     * @param op operator to fetch predecessors of
+     * @return list of all operators imeediately before op, or an empty list
+     * if op is a root.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        return (List<Operator>)toEdges.get(op);
+    }
+    
+    /**
+     * For a given operator, get all operators immediately after it.
+     * @param op operator to fetch successors of
+     * @return list of all operators imeediately after op, or an empty list
+     * if op is a leaf.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        return (List<Operator>)fromEdges.get(op);
+    }
+
+    /**
+     * Add a new operator to the plan.  It will not be connected to any
+     * existing operators.
+     * @param op operator to add
+     */
+    public void add(Operator op) {
+        markDirty();
+        ops.add(op);
+    }
+
+    /**
+     * Remove an operator from the plan.
+     * @param op Operator to be removed
+     * @throws IOException if the remove operation attempts to 
+     * remove an operator that is still connected to other operators.
+     */
+    public void remove(Operator op) throws IOException {
+        
+        if (fromEdges.containsKey(op) || toEdges.containsKey(op)) {
+            throw new IOException("Attempt to remove operator " + op.getName()
+                    + " that is still connected in the plan");
+        }
+        markDirty();
+        ops.remove(op);
+    }
+    
+    /**
+     * Connect two operators in the plan, controlling which position in the
+     * edge lists that the from and to edges are placed.
+     * @param from Operator edge will come from
+     * @param fromPos Position in the array for the from edge
+     * @param to Operator edge will go to
+     * @param toPos Position in the array for the to edge
+     */
+    public void connect(Operator from,
+                        int fromPos,
+                        Operator to,
+                        int toPos) {
+        markDirty();
+        fromEdges.put(from, to, fromPos);
+        toEdges.put(to, from, toPos);
+    }
+    
+    /**
+     * Connect two operators in the plan.
+     * @param from Operator edge will come from
+     * @param to Operator edge will go to
+     */
+    public void connect(Operator from, Operator to) {
+        markDirty();
+        fromEdges.put(from, to);
+        toEdges.put(to, from);
+    }
+    
+    /**
+     * Disconnect two operators in the plan.
+     * @param from Operator edge is coming from
+     * @param to Operator edge is going to
+     * @return pair of positions, indicating the position in the from and
+     * to arrays.
+     * @throws IOException if the two operators aren't connected.
+     */
+    public Pair<Integer, Integer> disconnect(Operator from,
+                                             Operator to) throws IOException {
+        Pair<Operator, Integer> f = fromEdges.removeWithPosition(from, to);
+        if (f == null) { 
+            throw new IOException("Attempt to disconnect operators " + 
+                from.getName() + " and " + to.getName() +
+                " which are not connected.");
+        }
+        
+        Pair<Operator, Integer> t = toEdges.removeWithPosition(to, from);
+        if (t == null) { 
+            throw new IOException("Plan in inconssistent state " + 
+                from.getName() + " and " + to.getName() +
+                " connected in fromEdges but not toEdges.");
+        }
+        
+        markDirty();
+        return new Pair<Integer, Integer>(f.second, t.second);
+    }
+
+    private void markDirty() {
+        roots.clear();
+        leaves.clear();
+    }
+
+    public Iterator<Operator> getOperators() {
+        return ops.iterator();
+    }
+   
+    public boolean isEqual(OperatorPlan other) {
+        return isEqual(this, other);
+    }
+    
+    private static boolean checkPredecessors(Operator op1,
+                                      Operator op2) {
+        try {
+            List<Operator> preds = op1.getPlan().getPredecessors(op1);
+            List<Operator> otherPreds = op2.getPlan().getPredecessors(op2);
+            if (preds == null && otherPreds == null) {
+                // intentionally blank
+            } else if (preds == null || otherPreds == null) {
+                return false;
+            } else {
+                if (preds.size() != otherPreds.size()) return false;
+                for (int i = 0; i < preds.size(); i++) {
+                    Operator p1 = preds.get(i);
+                    Operator p2 = otherPreds.get(i);
+                    if (!p1.isEqual(p2)) return false;
+                    if (!checkPredecessors(p1, p2)) return false;
+                }
+            }
+            return true;
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }   
+    
+    protected static boolean isEqual(OperatorPlan p1, OperatorPlan p2) {
+        if (p1 == p2) {
+            return true;
+        }
+        
+        if (p1 != null && p2 != null) {
+            List<Operator> leaves = p1.getSinks();
+            List<Operator> otherLeaves = p2.getSinks();
+            if (leaves.size() != otherLeaves.size()) return false;
+            // Must find some leaf that is equal to each leaf.  There is no
+            // guarantee leaves will be returned in any particular order.
+            boolean foundAll = true;
+            for (Operator op1 : leaves) {
+                boolean foundOne = false;
+                for (Operator op2 : otherLeaves) {
+                    if (op1.isEqual(op2) && checkPredecessors(op1, op2)) {
+                        foundOne = true;
+                        break;
+                    }
+                }
+                foundAll &= foundOne;
+                if (!foundAll) return false;
+            }
+            return foundAll;
+        }
+        
+        return false;
+    }
+    
+    public void explain(PrintStream ps, String format, boolean verbose) throws IOException {
+    }
+    
+    @Override
+    public String toString() {
+        ByteArrayOutputStream os = new ByteArrayOutputStream();
+        PrintStream ps = new PrintStream(os);
+        try {
+            explain(ps,"",false);
+        } catch (IOException e) {
+            return "";
+        }
+        return os.toString();
+    }   
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DependencyOrderWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A walker to walk graphs in dependency order.  It is guaranteed that a node
+ * will not be visited until all of its predecessors have been visited.  This
+ * is equivalent to doing a topilogical sort on the graph and then visiting
+ * the nodes in order.
+ */
+public class DependencyOrderWalker extends PlanWalker {
+
+    /**
+     * @param plan for this walker to traverse.
+     */
+    public DependencyOrderWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws IOException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // predecessors and calling itself for each of those predecessors.  When it
+        // finds a node that has no unfinished predecessors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> leaves = plan.getSinks();
+        if (leaves == null) return;
+        for (Operator op : leaves) {
+            doAllPredecessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllPredecessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws IOException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<Operator> preds = plan.getPredecessors(node);
+            if (preds != null && preds.size() > 0) {
+                // Do all our predecessors before ourself
+                for (Operator op : preds) {
+                    doAllPredecessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstMemoryWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.Stack;
+
+public class DepthFirstMemoryWalker extends DepthFirstWalker {
+    
+    private int level = 0;
+    private int startingLevel = 0;
+    private Stack<String> prefixStack;
+    private String currentPrefix = "";
+    
+    public DepthFirstMemoryWalker(OperatorPlan plan, int startingLevel) {
+        super(plan);
+        level = startingLevel;
+        this.startingLevel = startingLevel;
+        prefixStack = new Stack<String>();
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DepthFirstMemoryWalker(plan, level);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws IOException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws IOException {
+        List<Operator> roots = plan.getSources();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        depthFirst(null, roots, seen, visitor);
+    }
+    
+    public String getPrefix() {
+        return currentPrefix;
+    }
+
+    private void depthFirst(Operator node,
+                            Collection<Operator> successors,
+                            Set<Operator> seen,
+                            PlanVisitor visitor) throws IOException {
+        if (successors == null) return;
+        
+        StringBuilder strb = new StringBuilder(); 
+        for(int i = 0; i < startingLevel; i++ ) {
+            strb.append("|\t");
+        }
+        if( ((level-1) - startingLevel ) >= 0 )
+            strb.append("\t");
+        for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+            strb.append("|\t");
+        }
+        strb.append( "|\n" );
+        for(int i = 0; i < startingLevel; i++ ) {
+            strb.append("|\t");
+        }
+        if( ((level-1) - startingLevel ) >= 0 )
+            strb.append("\t");
+        for(int i = 0; i < ((level-1) - startingLevel ); i++ ) {
+            strb.append("|\t");
+        }
+        strb.append("|---");
+        currentPrefix = strb.toString();
+
+        for (Operator suc : successors) {
+            if (seen.add(suc)) {
+                suc.accept(visitor);
+                Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+                level++;
+                prefixStack.push(currentPrefix);
+                depthFirst(suc, newSuccessors, seen, visitor);
+                level--;
+                currentPrefix = prefixStack.pop();
+            }
+        }
+    }
+}
\ No newline at end of file

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/DepthFirstWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Do a depth first traversal of the graph.
+ */
+public class DepthFirstWalker extends PlanWalker {
+
+    public DepthFirstWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new DepthFirstWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws IOException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws IOException {
+        List<Operator> roots = plan.getSources();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        depthFirst(null, roots, seen, visitor);
+    }
+
+    private void depthFirst(Operator node,
+                            Collection<Operator> successors,
+                            Set<Operator> seen,
+                            PlanVisitor visitor) throws IOException {
+        if (successors == null) return;
+
+        for (Operator suc : successors) {
+            if (seen.add(suc)) {
+                suc.accept(visitor);
+                Collection<Operator> newSuccessors = plan.getSuccessors(suc);
+                depthFirst(suc, newSuccessors, seen, visitor);
+            }
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/Operator.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+public abstract class Operator {
+    
+    protected String name;
+    protected OperatorPlan plan; // plan that contains this operator
+    protected Map<String, Object> annotations;
+    protected final int hashPrime = 31;
+
+    public Operator(String n, OperatorPlan p) {
+        name = n;
+        plan = p;
+        annotations = new HashMap<String, Object>();
+    }
+
+    /**
+     * Accept a visitor at this node in the graph.
+     * @param v Visitor to accept.
+     * @throws IOException 
+     */
+    public abstract void accept(PlanVisitor v) throws IOException;
+
+    public String getName() {
+        return name;
+    }
+    
+    /**
+     * Get the plan associated with this operator.
+     * @return plan
+     */
+    public OperatorPlan getPlan() {
+        return plan;
+    }
+    
+    /**
+     * Add an annotation to a node in the plan.
+     * @param key string name of this annotation
+     * @param val value, as an Object
+     */
+    public void annotate(String key, Object val) {
+        annotations.put(key, val);
+    }
+    
+    /**
+     * Look to see if a node is annotated.
+     * @param key string name of annotation to look for
+     * @return value of the annotation, as an Object, or null if the key is
+     * not present in the map.
+     */
+    public Object getAnnotation(String key) {
+        return annotations.get(key);
+    }
+
+    /**
+     * Remove an annotation
+     * @param key the key of the annotation
+     * @return the original value of the annotation
+     */
+    public Object removeAnnotation(String key) {
+        return annotations.remove(key);
+    }
+    
+    /**
+     * This is like a shallow equals comparison.
+     * It returns true if two operators have equivalent properties even if they are 
+     * different objects. Here properties mean equivalent plan and equivalent name.
+     * @param operator
+     * @return true if two object have equivalent properties, else false
+     */
+    public abstract boolean isEqual(Operator operator);
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorPlan.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.pig.impl.util.Pair;
+
+public interface OperatorPlan {
+    
+    /**
+     * Get number of nodes in the plan.
+     */
+    public int size();
+
+    /**
+     * Get all operators in the plan that have no predecessors.
+     * @return all operators in the plan that have no predecessors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getSources();
+
+    /**
+     * Get all operators in the plan that have no successors.
+     * @return all operators in the plan that have no successors, or
+     * an empty list if the plan is empty.
+     */
+    public List<Operator> getSinks();
+
+    /**
+     * For a given operator, get all operators immediately before it in the
+     * plan.
+     * @param op operator to fetch predecessors of
+     * @return list of all operators immediately before op, or an empty list
+     * if op is a root.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getPredecessors(Operator op) throws IOException;
+    
+    /**
+     * For a given operator, get all operators immediately after it.
+     * @param op operator to fetch successors of
+     * @return list of all operators immediately after op, or an empty list
+     * if op is a leaf.
+     * @throws IOException if op is not in the plan.
+     */
+    public List<Operator> getSuccessors(Operator op) throws IOException;
+
+    /**
+     * Add a new operator to the plan.  It will not be connected to any
+     * existing operators.
+     * @param op operator to add
+     */
+    public void add(Operator op);
+
+    /**
+     * Remove an operator from the plan.
+     * @param op Operator to be removed
+     * @throws IOException if the remove operation attempts to 
+     * remove an operator that is still connected to other operators.
+     */
+    public void remove(Operator op) throws IOException;
+    
+    /**
+     * Connect two operators in the plan, controlling which position in the
+     * edge lists that the from and to edges are placed.
+     * @param from Operator edge will come from
+     * @param fromPos Position in the array for the from edge
+     * @param to Operator edge will go to
+     * @param toPos Position in the array for the to edge
+     */
+    public void connect(Operator from, int fromPos, Operator to, int toPos);
+    
+    /**
+     * Connect two operators in the plan.
+     * @param from Operator edge will come from
+     * @param to Operator edge will go to
+     */
+    public void connect(Operator from, Operator to);
+    
+    /**
+     * Disconnect two operators in the plan.
+     * @param from Operator edge is coming from
+     * @param to Operator edge is going to
+     * @return pair of positions, indicating the position in the from and
+     * to arrays.
+     * @throws IOException if the two operators aren't connected.
+     */
+    public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException;
+
+
+    /**
+     * Get an iterator of all operators in this plan
+     * @return an iterator of all operators in this plan
+     */
+    public Iterator<Operator> getOperators();
+    
+    /**
+     * This is like a shallow comparison.
+     * Two plans are equal if they have equivalent operators and equivalent 
+     * structure.
+     * @param other  object to compare
+     * @return boolean if both the plans are equivalent
+     */
+    public boolean isEqual( OperatorPlan other );
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/OperatorSubPlan.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.pig.impl.util.Pair;
+
+/**
+ * Class to represent a view of a plan. The view contains a subset of the plan.
+ * All the operators returned from the view are the same objects to the operators
+ * in its base plan. It is used to represent match results. 
+ *
+ */
+public class OperatorSubPlan implements OperatorPlan {
+
+    private OperatorPlan basePlan;
+    private List<Operator> roots;
+    private List<Operator> leaves;
+    private Set<Operator> operators;
+
+    public OperatorSubPlan(OperatorPlan base) {
+        basePlan = base;
+        roots = new ArrayList<Operator>();
+        leaves = new ArrayList<Operator>();
+        operators = new HashSet<Operator>();
+    }    	    	
+    
+    public OperatorPlan getBasePlan() {
+        return basePlan;
+    }
+    
+    public void add(Operator op) {
+        operators.add(op);
+        leaves.clear();
+        roots.clear();
+    }
+
+    public void connect(Operator from, int fromPos, Operator to, int toPos) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public void connect(Operator from, Operator to) {
+        throw new UnsupportedOperationException("connect() can not be called on OperatorSubPlan");
+    }
+
+    public Pair<Integer, Integer> disconnect(Operator from, Operator to) throws IOException {
+        throw new UnsupportedOperationException("disconnect() can not be called on OperatorSubPlan");
+    }
+
+    public List<Operator> getSinks() {
+        if (leaves.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getSuccessors(op) == null) {
+                        leaves.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return leaves;
+    }
+
+    public Iterator<Operator> getOperators() {
+        return operators.iterator();
+    }
+
+    public List<Operator> getPredecessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getPredecessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public List<Operator> getSources() {
+        if (roots.size() == 0 && operators.size() > 0) {
+            for (Operator op : operators) {       
+                try {
+                    if (getPredecessors(op) == null) {
+                        roots.add(op);
+                    }
+                }catch(Exception e) {
+                    throw new RuntimeException(e);
+                }
+            }
+        }
+        return roots;
+    }
+
+    public List<Operator> getSuccessors(Operator op) throws IOException {
+        List<Operator> l = basePlan.getSuccessors(op);
+        List<Operator> list = null;
+        if (l != null) {
+            for(Operator oper: l) {
+                if (operators.contains(oper)) {
+                    if (list == null) {
+                        list = new ArrayList<Operator>();
+                    }
+                    list.add(oper);
+                }
+            }
+        }
+        
+        return list;
+    }
+
+    public void remove(Operator op) throws IOException {
+        operators.remove(op);
+        leaves.clear();
+        roots.clear();
+    }
+
+    public int size() {
+        return operators.size();
+    }
+
+    @Override
+    public boolean isEqual(OperatorPlan other) {		
+        return BaseOperatorPlan.isEqual(this, other);
+    }    
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.util.Pair;
+
+public class PlanEdge extends MultiMap<Operator, Operator> {
+    
+    private static final long serialVersionUID = 1L;
+
+    public PlanEdge() {
+        super();
+    }
+
+    /**
+     * @param size Initial size of the map
+     */
+    public PlanEdge(int size) {
+        super(size);
+    }
+
+    /**
+     * Add an element to the map.
+     * @param key The key to store the value under.  If the key already
+     * exists the value will be added to the collection for that key, it
+     * will not replace the existing value (as in a standard map).
+     * @param value value to store.
+     * @param pos position in the arraylist to store the new value at.
+     * Positions are zero based.
+     */
+    public void put(Operator key, Operator value, int pos) {
+        ArrayList<Operator> list = mMap.get(key);
+        if (list == null) {
+            list = new ArrayList<Operator>();
+            if (pos != 0) {
+                throw new IndexOutOfBoundsException(
+                    "First edge cannot have position greater than 1");
+            }
+            list.add(value);
+            mMap.put(key, list);
+        } else {
+            list.add(pos, value);
+        }
+    }
+
+    /**
+     * Remove one value from an existing key and return which position in
+     * the arraylist the value was at..  If that is the last value
+     * for the key, then remove the key too.
+     * @param key Key to remove the value from.
+     * @param value Value to remove.
+     * @return A pair containing the value being removed and an integer
+     * indicating the position, or null if the key or value does
+     * not exist.  Positions are zero based.
+     */
+    public Pair<Operator, Integer> removeWithPosition(Operator key,
+                                                      Operator value) {
+        ArrayList<Operator> list = mMap.get(key);
+        if (list == null) return null;
+
+        int index = -1;
+        Iterator<Operator> i = list.iterator();
+        Operator keeper = null;
+        for (int j = 0; i.hasNext(); j++) {
+            keeper = i.next();
+            //if (keeper.equals(value)) {
+            if (keeper == value) {
+                i.remove();
+                index = j;
+                break;
+            }
+        }
+        
+        if (index == -1) return null;
+
+        if (list.size() == 0) {
+            mMap.remove(key);
+        }
+
+        return new Pair<Operator, Integer>(keeper, index);
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.Stack;
+
+import org.apache.pig.impl.plan.VisitorException;
+
+/**
+ * A visitor mechanism for navigating and operating on a plan of 
+ * Operators.  This class contains the logic to traverse the plan.  It does
+ * not visit individual nodes.  That is left to implementing classes
+ * (such as LOVisitor).
+ */
+public abstract class PlanVisitor {
+
+    // TODO Remove this scope value
+    final protected static String DEFAULT_SCOPE = "scope";
+    
+    protected OperatorPlan plan;
+
+    /**
+     * Guaranteed to always point to the walker currently being used.
+     */
+    protected PlanWalker currentWalker;
+
+    private Stack<PlanWalker> walkers;
+
+    /**
+     * Entry point for visiting the plan.
+     * @throws VisitorException if an error is encountered while visiting.
+     */
+    public void visit() throws IOException {
+        currentWalker.walk(this);
+    }
+
+    public OperatorPlan getPlan() {
+        return plan;
+    }
+
+    /**
+     * @param plan OperatorPlan this visitor will visit.
+     * @param walker PlanWalker this visitor will use to traverse the plan.
+     */
+    protected PlanVisitor(OperatorPlan plan, PlanWalker walker) {
+        this.plan = plan;
+        currentWalker = walker;
+        walkers = new Stack<PlanWalker>();
+    }
+
+    /**
+     * Push the current walker onto the stack of saved walkers and begin using
+     * the newly passed walker as the current walker.
+     * @param walker new walker to set as the current walker.
+     */
+    protected void pushWalker(PlanWalker walker) {
+        walkers.push(currentWalker);
+        currentWalker = walker;
+    }
+
+    /**
+     * Pop the next to previous walker off of the stack and set it as the current
+     * walker.  This will drop the reference to the current walker.
+     * @throws VisitorException if there are no more walkers on the stack.  In
+     * this case the current walker is not reset.
+     */
+    protected void popWalker() throws VisitorException {
+        if (walkers.empty()) {
+            throw new VisitorException("No more walkers to pop.");
+        }
+        currentWalker = walkers.pop();
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/PlanWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+
+public abstract class PlanWalker {
+
+    protected OperatorPlan plan;
+
+    /**
+     * @param plan Plan for this walker to traverse.
+     */
+    public PlanWalker(OperatorPlan plan) {
+        this.plan = plan;
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.  This can't be set in
+     * the constructor because the visitor is constructing this class, and does
+     * not yet have a 'this' pointer to send as an argument.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    public abstract void walk(PlanVisitor visitor) throws IOException;
+
+    /**
+     * Return a new instance of this same type of walker for a subplan.
+     * When this method is called the same type of walker with the
+     * provided plan set as the plan, must be returned.  This can then be
+     * used to walk subplans.  This allows abstract visitors to clone
+     * walkers without knowning the type of walker their subclasses used.
+     * @param plan Plan for the new walker.
+     * @return Instance of the same type of walker with plan set to plan.
+     */
+    public abstract PlanWalker spawnChildWalker(OperatorPlan plan);
+
+    public OperatorPlan getPlan() {
+        return plan ;
+    }
+    
+    /**
+     * Set the plan for this walker to operate on.
+     * @param plan to walk
+     */
+    public void setPlan(OperatorPlan plan) {
+        this.plan = plan;
+    }
+
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/ReverseDependencyOrderWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Visit a plan in the reverse of the dependency order.  That is, every node
+ * after every node that depends on it is visited.  Thus this is equivalent to
+ * doing a reverse topilogical sort on the graph and then visiting it in order.
+ */
+public class ReverseDependencyOrderWalker extends PlanWalker {
+
+    public ReverseDependencyOrderWalker(OperatorPlan plan) {
+        super(plan);
+    }
+
+    @Override
+    public PlanWalker spawnChildWalker(OperatorPlan plan) {
+        return new ReverseDependencyOrderWalker(plan);
+    }
+
+    /**
+     * Begin traversing the graph.
+     * @param visitor Visitor this walker is being used by.
+     * @throws VisitorException if an error is encountered while walking.
+     */
+    @Override
+    public void walk(PlanVisitor visitor) throws IOException {
+        // This is highly inefficient, but our graphs are small so it should be okay.
+        // The algorithm works by starting at any node in the graph, finding it's
+        // successors and calling itself for each of those successors.  When it
+        // finds a node that has no unfinished successors it puts that node in the
+        // list.  It then unwinds itself putting each of the other nodes in the list.
+        // It keeps track of what nodes it's seen as it goes so it doesn't put any
+        // nodes in the graph twice.
+
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+        List<Operator> roots = plan.getSources();
+        if (roots == null) return;
+        for (Operator op : roots) {
+            doAllSuccessors(op, seen, fifo);
+        }
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+
+    protected void doAllSuccessors(Operator node,
+                                   Set<Operator> seen,
+                                   Collection<Operator> fifo) throws IOException {
+        if (!seen.contains(node)) {
+            // We haven't seen this one before.
+            Collection<Operator> succs = plan.getSuccessors(node);
+            if (succs != null && succs.size() > 0) {
+                // Do all our successors before ourself
+                for (Operator op : succs) {
+                    doAllSuccessors(op, seen, fifo);
+                }
+            }
+            // Now do ourself
+            seen.add(node);
+            fifo.add(node);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/SubtreeDependencyOrderWalker.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.newplan;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+public class SubtreeDependencyOrderWalker extends DependencyOrderWalker {
+    private Operator startNode;
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan) {
+        super(plan);            
+    }
+    
+    public SubtreeDependencyOrderWalker(OperatorPlan plan, Operator startNode) {
+        super(plan);            
+        this.startNode = startNode;
+    }
+    
+    public void walk(PlanVisitor visitor) throws IOException {          
+        List<Operator> fifo = new ArrayList<Operator>();
+        Set<Operator> seen = new HashSet<Operator>();
+
+        // get all predecessors of startNode
+        doAllPredecessors(startNode, seen, fifo);           
+
+        for (Operator op: fifo) {
+            op.accept(visitor);
+        }
+    }
+}

Added: hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=982345&view=auto
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (added)
+++ hadoop/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Wed Aug  4 17:46:42 2010
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.pig.newplan.logical;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+
+import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.impl.logicalLayer.LODistinct;
+import org.apache.pig.impl.logicalLayer.LOFilter;
+import org.apache.pig.impl.logicalLayer.RelationalOperator;
+import org.apache.pig.impl.logicalLayer.LOForEach;
+import org.apache.pig.impl.logicalLayer.LOLimit;
+import org.apache.pig.impl.logicalLayer.LOProject;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.impl.plan.DependencyOrderWalker;
+import org.apache.pig.impl.plan.PlanWalker;
+import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.Pair;
+import org.apache.pig.newplan.logical.expression.DereferenceExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.relational.LOInnerLoad;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+
+
+// visitor to translate the inner plan of foreach
+// it contains methods to translate all the operators that are allowed 
+// in the inner plan of foreach
+public class ForeachInnerPlanVisitor extends LogicalExpPlanMigrationVistor {
+    private org.apache.pig.newplan.logical.relational.LogicalPlan newInnerPlan;
+    private LOForEach oldForeach;
+    private org.apache.pig.newplan.logical.relational.LogicalRelationalOperator gen;
+    private int inputNo;
+    private HashMap<LogicalOperator, LogicalRelationalOperator> innerOpsMap;
+
+    public ForeachInnerPlanVisitor(org.apache.pig.newplan.logical.relational.LOForEach foreach, LOForEach oldForeach, LogicalPlan innerPlan, 
+            LogicalPlan oldLogicalPlan) {
+        super(innerPlan, foreach, oldLogicalPlan);
+        newInnerPlan = foreach.getInnerPlan();
+        
+        // get next inputNo 
+        gen = (org.apache.pig.newplan.logical.relational.LogicalRelationalOperator)
+            newInnerPlan.getSinks().get(0);
+        try {
+            inputNo = 0;
+            List<org.apache.pig.newplan.Operator> suc = newInnerPlan.getPredecessors(gen);
+            if (suc != null) {
+                inputNo = suc.size();
+            }
+        }catch(Exception e) {
+            throw new RuntimeException(e);
+        }
+        this.oldForeach = oldForeach;
+                    
+        innerOpsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+    }
+    
+    private void translateInnerPlanConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) throws IOException {
+        List<LogicalOperator> preds = mPlan.getPredecessors(oldOp);
+        
+        if(preds != null) {
+            for(LogicalOperator pred: preds) {
+                org.apache.pig.newplan.Operator newPred = innerOpsMap.get(pred);
+                if (newPred.getPlan().getSuccessors(newPred)!=null) {
+                    org.apache.pig.newplan.Operator newSucc = newOp.getPlan().getSuccessors(newPred).get(0);
+                    Pair<Integer, Integer> pair = newOp.getPlan().disconnect(newPred, newSucc);
+                    newOp.getPlan().connect(newPred, newOp);
+                    newOp.getPlan().connect(newOp, pair.first, newSucc, pair.second);
+                }
+                else {
+                    newOp.getPlan().connect(newPred, newOp);
+                }
+            }
+        }
+    }
+    
+    private LogicalExpressionPlan translateInnerExpressionPlan(LogicalPlan lp, LogicalRelationalOperator op, LogicalPlan outerPlan) throws VisitorException {
+        PlanWalker<LogicalOperator, LogicalPlan> childWalker = 
+            new DependencyOrderWalker<LogicalOperator, LogicalPlan>(lp);
+        
+        LogicalExpPlanMigrationVistor childPlanVisitor = new LogicalExpPlanMigrationVistor(lp, op, outerPlan);
+        
+        childWalker.walk(childPlanVisitor);
+        return childPlanVisitor.exprPlan;
+    }
+    
+    public void visit(LOProject project) throws VisitorException {
+        LogicalOperator op = project.getExpression();
+        
+        if (op == outerPlan.getPredecessors(oldForeach).get(0)) {
+            // if this projection is to get a field from outer plan, change it
+            // to LOInnerLoad
+            
+            LOInnerLoad innerLoad = new LOInnerLoad(newInnerPlan, 
+                    (org.apache.pig.newplan.logical.relational.LOForEach)attachedRelationalOp, 
+                    project.isStar()?-1:project.getCol());
+            
+            newInnerPlan.add(innerLoad);
+            innerOpsMap.put(project, innerLoad);
+            
+            // The logical plan part for this foreach plan is done, add ProjectExpression 
+            // into expression plan.
+                                  
+            // The logical plan part is done, add this sub plan under LOGenerate, 
+            // and prepare for the expression plan
+            newInnerPlan.connect(innerLoad, gen);
+            
+            ProjectExpression pe = new ProjectExpression(exprPlan, inputNo++, -1, gen);
+            exprPlan.add(pe);
+            exprOpsMap.put(project, pe);
+            try {
+                translateInnerPlanConnection(project, pe);
+            } catch (IOException e) {
+                throw new VisitorException(e);
+            } 
+        }
+
+        // This case occurs when there are two projects one after another
+        // These projects in combination project a column (bag) out of a tuple 
+        // and then project a column out of this projected bag
+        // Here we merge these two projects into one BagDereferenceExpression
+        else if( op instanceof LOProject ) {
+            LogicalExpression expOper = exprOpsMap.get(op);
+            
+            if (expOper!=null) {
+                // Add the dereference in the plan
+                DereferenceExpression dereferenceExp = new DereferenceExpression(
+                        exprPlan, project.getProjection());
+                exprOpsMap.put(project, dereferenceExp);
+                exprPlan.add(dereferenceExp);
+                exprPlan.connect(dereferenceExp, expOper);
+            }
+        } else {
+            if (op instanceof RelationalOperator && project.isSendEmptyBagOnEOP()) {
+                LogicalOperator currentOp = op;
+                while (currentOp instanceof RelationalOperator) {
+                    List<LogicalOperator> preds = mPlan.getPredecessors(currentOp);
+                    if (preds!=null)
+                        currentOp = preds.get(0);
+                    else break;
+                }
+                if (currentOp instanceof ExpressionOperator) {
+                    LogicalExpression exp = exprOpsMap.get(currentOp);
+                    if (exp!=null)
+                        exprOpsMap.put(project, exp);
+                }
+            }
+        }
+    }
+    
+    public void visit(LOSort sort) throws VisitorException {
+        List<LogicalPlan> sortPlans = sort.getSortColPlans();
+        List<LogicalExpressionPlan> newSortPlans = new ArrayList<LogicalExpressionPlan>();
+        
+        org.apache.pig.newplan.logical.relational.LOSort newSort = 
+            new org.apache.pig.newplan.logical.relational.LOSort(newInnerPlan, 
+                    newSortPlans, sort.getAscendingCols(), sort.getUserFunc());
+        
+        newSort.setAlias(sort.getAlias());
+        newSort.setRequestedParallelism(sort.getRequestedParallelism());
+        newSort.setLimit(sort.getLimit());
+        newInnerPlan.add(newSort);
+        innerOpsMap.put(sort, newSort);
+        try {
+            translateInnerPlanConnection(sort, newSort);
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
+        
+        for (LogicalPlan sortPlan : sortPlans) {
+            LogicalExpressionPlan newSortPlan = translateInnerExpressionPlan(sortPlan, newSort, mPlan);
+            newSortPlans.add(newSortPlan);
+        }
+    }
+
+    public void visit(LOLimit limit) throws VisitorException {
+        org.apache.pig.newplan.logical.relational.LOLimit newLimit = 
+            new org.apache.pig.newplan.logical.relational.LOLimit(newInnerPlan, 
+                    limit.getLimit());
+        
+        newLimit.setAlias(limit.getAlias());
+        newLimit.setRequestedParallelism(limit.getRequestedParallelism());
+        newInnerPlan.add(newLimit);
+        innerOpsMap.put(limit, newLimit);
+        try {
+            translateInnerPlanConnection(limit, newLimit);
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }        
+    }
+    
+    public void visit(LODistinct distinct) throws VisitorException {
+        org.apache.pig.newplan.logical.relational.LODistinct newDistinct = 
+            new org.apache.pig.newplan.logical.relational.LODistinct(newInnerPlan);
+        
+        newDistinct.setAlias(distinct.getAlias());
+        newDistinct.setRequestedParallelism(distinct.getRequestedParallelism());
+        newInnerPlan.add(newDistinct);
+        innerOpsMap.put(distinct, newDistinct);
+        try {
+            translateInnerPlanConnection(distinct, newDistinct);
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
+    }
+    
+    public void visit(LOFilter filter) throws VisitorException {
+        org.apache.pig.newplan.logical.relational.LOFilter newFilter = 
+            new org.apache.pig.newplan.logical.relational.LOFilter(newInnerPlan);
+        
+        newFilter.setAlias(filter.getAlias());
+        newFilter.setRequestedParallelism(filter.getRequestedParallelism());
+        LogicalExpressionPlan newFilterPlan = translateInnerExpressionPlan(filter.getComparisonPlan(), newFilter, mPlan);
+        newFilter.setFilterPlan(newFilterPlan);
+        newInnerPlan.add(newFilter);
+        innerOpsMap.put(filter, newFilter);
+        try {
+            translateInnerPlanConnection(filter, newFilter);
+        } catch (IOException e) {
+            throw new VisitorException(e);
+        }
+    }
+}
\ No newline at end of file