You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/17 23:08:09 UTC

svn commit: r1050503 [1/2] - in /pig/trunk: src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/ ...

Author: yanz
Date: Fri Dec 17 22:08:08 2010
New Revision: 1050503

URL: http://svn.apache.org/viewvc?rev=1050503&view=rev
Log:
PIG-1712: ILLUSTRATE rework (yanz)

Modified:
    pig/trunk/src/org/apache/pig/PigServer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
    pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
    pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
    pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
    pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
    pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
    pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
    pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
    pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
    pig/trunk/src/org/apache/pig/pen/Illustrator.java
    pig/trunk/src/org/apache/pig/pen/IllustratorAttacher.java
    pig/trunk/src/org/apache/pig/pen/LineageTrimmingVisitor.java
    pig/trunk/src/org/apache/pig/pen/LocalMapReduceSimulator.java
    pig/trunk/src/org/apache/pig/pen/util/DisplayExamples.java
    pig/trunk/src/org/apache/pig/pen/util/MetricEvaluation.java
    pig/trunk/src/org/apache/pig/pen/util/PreOrderDepthFirstWalker.java
    pig/trunk/test/org/apache/pig/test/TestExampleGenerator.java
    pig/trunk/test/org/apache/pig/test/data/GoldenFiles/MRC18.gld

Modified: pig/trunk/src/org/apache/pig/PigServer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/PigServer.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/PigServer.java (original)
+++ pig/trunk/src/org/apache/pig/PigServer.java Fri Dec 17 22:08:08 2010
@@ -102,6 +102,7 @@ import org.apache.pig.impl.util.Properti
 import org.apache.pig.impl.util.Utils;
 import org.apache.pig.newplan.logical.LogicalPlanMigrationVistor;
 import org.apache.pig.newplan.logical.optimizer.LogicalPlanOptimizer;
+import org.apache.pig.newplan.Operator;
 import org.apache.pig.pen.ExampleGenerator;
 import org.apache.pig.scripting.ScriptEngine;
 import org.apache.pig.tools.grunt.GruntParser;
@@ -1125,7 +1126,7 @@ public class PigServer {
         return currDAG.getAliasOp().keySet();
     }
 
-    public Map<LogicalOperator, DataBag> getExamples(String alias) throws IOException {
+    public Map<Operator, DataBag> getExamples(String alias) throws IOException {
         LogicalPlan plan = null;
         try {        
             if (currDAG.isBatchOn() && alias != null) {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/HExecutionEngine.java Fri Dec 17 22:08:08 2010
@@ -107,10 +107,10 @@ public class HExecutionEngine {
     // map from LOGICAL key to into about the execution
     protected Map<OperatorKey, MapRedResult> materializedResults;
     
-    protected Map<LogicalOperator, PhysicalOperator> logToPhyMap;
     protected Map<LogicalOperator, LogicalRelationalOperator> opsMap;
     protected Map<Operator, PhysicalOperator> newLogToPhyMap;
     private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerOpMap;
+    private org.apache.pig.newplan.logical.relational.LogicalPlan newPreoptimizedPlan;
     
     public HExecutionEngine(PigContext pigContext) {
         this.pigContext = pigContext;
@@ -265,6 +265,8 @@ public class HExecutionEngine {
                 opsMap = visitor.getOldToNewLOOpMap();
                 forEachInnerOpMap = visitor.getForEachInnerMap();
                 org.apache.pig.newplan.logical.relational.LogicalPlan newPlan = visitor.getNewLogicalPlan();
+                newPreoptimizedPlan =
+                    new org.apache.pig.newplan.logical.relational.LogicalPlan(newPlan);
                 
                 if (pigContext.inIllustrator) {
                     // disable all PO-specific optimizations
@@ -342,33 +344,28 @@ public class HExecutionEngine {
         }
     }
     
-    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
-        if (logToPhyMap != null)
-            return logToPhyMap;
-        else if (newLogToPhyMap != null) {
-            Map<LogicalOperator, PhysicalOperator> result = new HashMap<LogicalOperator, PhysicalOperator>();
-            for (LogicalOperator lo: opsMap.keySet()) {
-                result.put(lo, newLogToPhyMap.get(opsMap.get(lo))); 
-            }
-            return result;
-        } else
-            return null;
-    }
-    
-    public Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
-        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> result =
-            new HashMap<LOForEach, Map<LogicalOperator, PhysicalOperator>>();
+    public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+        return newLogToPhyMap;
+    }
+    
+    public Map<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> getForEachInnerLogToPhyMap() {
+        Map<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> result =
+            new HashMap<org.apache.pig.newplan.logical.relational.LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>>();
         for (Map.Entry<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> entry :
             forEachInnerOpMap.entrySet()) {
-            Map<LogicalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalOperator, PhysicalOperator>();
+            Map<LogicalRelationalOperator, PhysicalOperator> innerOpMap = new HashMap<LogicalRelationalOperator, PhysicalOperator>();
             for (Map.Entry<LogicalOperator, LogicalRelationalOperator> innerEntry : entry.getValue().entrySet()) {
-                innerOpMap.put(innerEntry.getKey(), newLogToPhyMap.get(innerEntry.getValue()));
+                innerOpMap.put(innerEntry.getValue(), newLogToPhyMap.get(innerEntry.getValue()));
             }
-            result.put(entry.getKey(), innerOpMap);
+            result.put((org.apache.pig.newplan.logical.relational.LOForEach) (opsMap.get(entry.getKey())), innerOpMap);
         }
         return result;
     }
     
+    public org.apache.pig.newplan.logical.relational.LogicalPlan getNewPlan() {
+        return newPreoptimizedPlan;
+    }
+    
     public static class SortInfoSetter extends LogicalRelationalNodesVisitor {
 
         public SortInfoSetter(OperatorPlan plan) throws FrontendException {

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceLauncher.java Fri Dec 17 22:08:08 2010
@@ -524,7 +524,7 @@ public class MapReduceLauncher extends L
         if (isMultiQuery) {
             // reduces the number of MROpers in the MR plan generated 
             // by multi-query (multi-store) script.
-            MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan);
+            MultiQueryOptimizer mqOptimizer = new MultiQueryOptimizer(plan, pc.inIllustrator);
             mqOptimizer.visit();
         }
         

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MapReduceOper.java Fri Dec 17 22:08:08 2010
@@ -20,8 +20,6 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.ByteArrayOutputStream;
 import java.util.HashSet;
 import java.util.Set;
-import java.util.Map;
-import java.util.HashMap;
 
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
@@ -32,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.Operator;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 
 /**
  * An operator model for a Map Reduce job. 
@@ -150,7 +149,7 @@ public class MapReduceOper extends Opera
 	
 	// Map of the physical operator in physical plan to the one in MR plan: only needed
 	// if the physical operator is changed/replaced in MR compilation due to, e.g., optimization
-	public Map<PhysicalOperator, PhysicalOperator> phyToMRMap;
+	public MultiMap<PhysicalOperator, PhysicalOperator> phyToMRMap;
 	
 	private static enum OPER_FEATURE {
 	    NONE,
@@ -175,7 +174,7 @@ public class MapReduceOper extends Opera
         scalars = new HashSet<PhysicalOperator>();
         nig = NodeIdGenerator.getGenerator();
         scope = k.getScope();
-        phyToMRMap = new HashMap<PhysicalOperator, PhysicalOperator>();
+        phyToMRMap = new MultiMap<PhysicalOperator, PhysicalOperator>();
     }
 
     /*@Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/MultiQueryOptimizer.java Fri Dec 17 22:08:08 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.Iterator;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -77,12 +78,14 @@ class MultiQueryOptimizer extends MROpPl
     
     private String scope;
     
-    MultiQueryOptimizer(MROperPlan plan) {
+    private boolean inIllustrator = false;
+    
+    MultiQueryOptimizer(MROperPlan plan, boolean inIllustrator) {
         super(plan, new ReverseDependencyOrderWalker<MapReduceOper, MROperPlan>(plan));
         nig = NodeIdGenerator.getGenerator();
         List<MapReduceOper> roots = plan.getRoots();
         scope = roots.get(0).getOperatorKey().getScope();
-        
+        this.inIllustrator = inIllustrator;
         log.info("MR plan size before optimization: " + plan.size());
     }
 
@@ -290,13 +293,28 @@ class MultiQueryOptimizer extends MROpPl
                 PhysicalOperator opSucc = succ.mapPlan.getSuccessors(op).get(0);
                 PhysicalPlan clone = null;
                 try {
+                    if (inIllustrator)
+                        pl.setOpMap(succ.phyToMRMap);
                     clone = pl.clone();
+                    if (inIllustrator)
+                        pl.resetOpMap();
                 } catch (CloneNotSupportedException e) {
                     int errCode = 2127;
                     String msg = "Internal Error: Cloning of plan failed for optimization.";
                     throw new OptimizerException(msg, errCode, PigException.BUG, e);
                 }
                 succ.mapPlan.remove(op);
+                
+                if (inIllustrator) {
+                    // need to remove the LOAD since data from load on temporary files can't be handled by illustrator
+                    for (Iterator<PhysicalOperator> it = pl.iterator(); it.hasNext(); )
+                    {
+                        PhysicalOperator po = it.next();
+                        if (po instanceof POLoad)
+                            succ.phyToMRMap.removeKey(po);
+                    }
+                }
+                
                 while (!clone.isEmpty()) {
                     PhysicalOperator oper = clone.getLeaves().get(0);
                     clone.remove(oper);

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhysicalPlan.java Fri Dec 17 22:08:08 2010
@@ -24,7 +24,6 @@ import java.io.PrintStream;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.HashMap;
-import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 
@@ -37,6 +36,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.OperatorPlan;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.MultiMap;
 
 /**
  * 
@@ -57,6 +57,8 @@ public class PhysicalPlan extends Operat
     // and that there is no more input expected.
     public boolean endOfAllInput = false;
 
+    private MultiMap<PhysicalOperator, PhysicalOperator> opmap = null;
+    
     public PhysicalPlan() {
         super();
     }
@@ -224,6 +226,8 @@ public class PhysicalPlan extends Operat
         for (PhysicalOperator op : mOps.keySet()) {
             PhysicalOperator c = op.clone();
             clone.add(c);
+            if (opmap != null)
+                opmap.put(op, c);
             matches.put(op, c);
         }
 
@@ -296,6 +300,12 @@ public class PhysicalPlan extends Operat
         return clone;
     }
     
+    public void setOpMap(MultiMap<PhysicalOperator, PhysicalOperator> opmap) {
+        this.opmap = opmap;
+    }
     
-    
+    public void resetOpMap()
+    {
+        opmap = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOLimit.java Fri Dec 17 22:08:08 2010
@@ -52,6 +52,10 @@ public class LOLimit extends RelationalO
         return mPlan.getPredecessors(this).get(0);
     }
 
+    public LogicalOperator getInput(LogicalPlan plan) {
+        return plan.getPredecessors(this).get(0);
+    }
+    
     public long getLimit() {
         return mLimit;
     }

Modified: pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/impl/logicalLayer/LOUnion.java Fri Dec 17 22:08:08 2010
@@ -55,6 +55,10 @@ public class LOUnion extends RelationalO
         return mPlan.getPredecessors(this);
     }
     
+    public List<LogicalOperator> getInputs(LogicalPlan plan) {
+        return plan.getPredecessors(this);
+    }
+    
     @Override
     public Schema getSchema() throws FrontendException {
         if (!mIsSchemaComputed) {

Modified: pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/BaseOperatorPlan.java Fri Dec 17 22:08:08 2010
@@ -54,6 +54,18 @@ public abstract class BaseOperatorPlan i
         softToEdges = new PlanEdge();
     }
     
+    @SuppressWarnings("unchecked")
+    public BaseOperatorPlan(BaseOperatorPlan other) {
+        // (shallow) copy constructor
+        ops = (Set<Operator>) ((HashSet<Operator>) other.ops).clone();
+        roots = (List<Operator>) ((ArrayList) other.roots).clone();
+        leaves = (List<Operator>) ((ArrayList) other.leaves).clone();
+        fromEdges = other.fromEdges.shallowClone();
+        toEdges = other.toEdges.shallowClone();
+        softFromEdges = other.softFromEdges.shallowClone();
+        softToEdges = other.softToEdges.shallowClone();
+    }
+    
     /**
      * Get number of nodes in the plan.
      */

Modified: pig/trunk/src/org/apache/pig/newplan/PlanEdge.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/PlanEdge.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/PlanEdge.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/PlanEdge.java Fri Dec 17 22:08:08 2010
@@ -20,6 +20,7 @@ package org.apache.pig.newplan;
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.Map;
 
 import org.apache.pig.impl.util.MultiMap;
 import org.apache.pig.impl.util.Pair;
@@ -100,4 +101,14 @@ public class PlanEdge extends MultiMap<O
         return new Pair<Operator, Integer>(keeper, index);
     }
 
+    public PlanEdge shallowClone() {
+        // shallow clone: elements not cloned
+        PlanEdge result = new PlanEdge();
+        for (Map.Entry<Operator, ArrayList<Operator>> entry : mMap.entrySet()) {
+            ArrayList<Operator> list = new ArrayList<Operator>();
+            list.addAll(entry.getValue());
+            result.put(entry.getKey(), list);
+        }
+        return result;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Fri Dec 17 22:08:08 2010
@@ -292,4 +292,8 @@ public class LOCogroup extends LogicalRe
         groupKeyUidOnlySchema = null;
         generatedInputUids = new HashMap<Integer,Long>();
     }
+    
+    public List<Operator> getInputs(LogicalPlan plan) {
+      return plan.getPredecessors(this);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCross.java Fri Dec 17 22:08:08 2010
@@ -21,6 +21,7 @@ import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
 
@@ -93,4 +94,8 @@ public class LOCross extends LogicalRela
             return false;
         }
     }
+    
+    public List<Operator>  getInputs() {
+        return plan.getPredecessors(this);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LODistinct.java Fri Dec 17 22:08:08 2010
@@ -59,4 +59,8 @@ public class LODistinct extends LogicalR
             return false;
         }
     }
+    
+    public Operator getInput(LogicalPlan plan) {
+        return plan.getPredecessors(this).get(0);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOFilter.java Fri Dec 17 22:08:08 2010
@@ -73,5 +73,9 @@ public class LOFilter extends LogicalRel
             return false;
         }
     }
+    
+    public Operator getInput(LogicalPlan plan) {
+        return plan.getPredecessors(this).get(0);
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLimit.java Fri Dec 17 22:08:08 2010
@@ -18,6 +18,7 @@
 package org.apache.pig.newplan.logical.relational;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.PlanVisitor;
 
@@ -69,4 +70,8 @@ public class LOLimit extends LogicalRela
         else
             return false;
     }
+    
+    public Operator getInput(LogicalPlan plan) {
+        return plan.getPredecessors(this).get(0);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOLoad.java Fri Dec 17 22:08:08 2010
@@ -45,6 +45,7 @@ public class LOLoad extends LogicalRelat
     private List<Integer> requiredFields = null;
     private boolean castInserted = false;
     private LogicalSchema uidOnlySchema;
+    private String schemaFile = null;
 
     /**
      * 
@@ -57,9 +58,15 @@ public class LOLoad extends LogicalRelat
        super("LOLoad", plan);
        scriptSchema = schema;
        fs = loader;
+       if (loader != null)
+           schemaFile = loader.getFileName();
        this.conf = conf;
     }
     
+    public String getSchemaFile() {
+        return schemaFile;
+    }
+    
     public LoadFunc getLoadFunc() throws FrontendException {
         try { 
             if (loadFunc == null && fs!=null) {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOSort.java Fri Dec 17 22:08:08 2010
@@ -25,6 +25,7 @@ import org.apache.pig.FuncSpec;
 import org.apache.pig.SortColInfo;
 import org.apache.pig.SortInfo;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.PlanVisitor;
@@ -157,4 +158,8 @@ public class LOSort extends LogicalRelat
         }
         return checkEquality((LogicalRelationalOperator)other);
     }
+    
+    public Operator getInput(LogicalPlan plan) {
+        return plan.getPredecessors(this).get(0);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOUnion.java Fri Dec 17 22:08:08 2010
@@ -125,4 +125,12 @@ public class LOUnion extends LogicalRela
     public void resetUid() {
         uidMapping = new ArrayList<Pair<Long, Long>>();
     }
+    
+    public List<Operator> getInputs() {
+        return plan.getPredecessors(this);
+    }
+    
+    public List<Operator> getInputs(LogicalPlan plan) {
+        return plan.getPredecessors(this);
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalPlan.java Fri Dec 17 22:08:08 2010
@@ -19,6 +19,7 @@
 package org.apache.pig.newplan.logical.relational;
 
 import java.io.PrintStream;
+import java.util.HashSet;
 
 import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.newplan.BaseOperatorPlan;
@@ -32,6 +33,15 @@ import org.apache.pig.newplan.logical.op
  * each relational operator.
  */
 public class LogicalPlan extends BaseOperatorPlan {
+  
+    public LogicalPlan(LogicalPlan other) {
+        // shallow copy constructor
+        super(other);
+    }
+    
+    public LogicalPlan() {
+        super();
+    }
     
     /**
      * Equality is checked by calling equals on every leaf in the plan.  This

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogicalSchema.java Fri Dec 17 22:08:08 2010
@@ -80,12 +80,12 @@ public class LogicalSchema {
                 if( schema == null ) {
                     return ( alias + uidString + ":bag{}" );
                 }
-                return ( alias + uidString + ":bag{" + schema.toString() + "}" );
+                return ( alias + uidString + ":bag{" + schema.toString(verbose) + "}" );
             } else if( type == DataType.TUPLE ) {
                 if( schema == null ) {
                     return ( alias + uidString + ":tuple{}" );
                 }
-                return ( alias + uidString + ":tuple(" + schema.toString() + ")" );
+                return ( alias + uidString + ":tuple(" + schema.toString(verbose) + ")" );
             }
             return ( alias + uidString + ":" + DataType.findTypeName(type) );
         }

Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Fri Dec 17 22:08:08 2010
@@ -26,6 +26,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.HashSet;
 import java.util.Set;
+import java.util.Collection;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -37,71 +38,75 @@ import org.apache.pig.data.DataByteArray
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
-import org.apache.pig.impl.logicalLayer.BinaryExpressionOperator;
-import org.apache.pig.impl.logicalLayer.ExpressionOperator;
+import org.apache.pig.newplan.logical.expression.BinaryExpression;
 import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOAdd;
-import org.apache.pig.impl.logicalLayer.LOAnd;
-import org.apache.pig.impl.logicalLayer.LOCast;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LOConst;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LODivide;
-import org.apache.pig.impl.logicalLayer.LOEqual;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOGreaterThan;
-import org.apache.pig.impl.logicalLayer.LOGreaterThanEqual;
-import org.apache.pig.impl.logicalLayer.LOLesserThan;
-import org.apache.pig.impl.logicalLayer.LOLesserThanEqual;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOMod;
-import org.apache.pig.impl.logicalLayer.LOMultiply;
-import org.apache.pig.impl.logicalLayer.LONot;
-import org.apache.pig.impl.logicalLayer.LONotEqual;
-import org.apache.pig.impl.logicalLayer.LOOr;
-import org.apache.pig.impl.logicalLayer.LOProject;
-import org.apache.pig.impl.logicalLayer.LORegexp;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOSubtract;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.newplan.logical.expression.AddExpression;
+import org.apache.pig.newplan.logical.expression.AndExpression;
+import org.apache.pig.newplan.logical.expression.CastExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
+import org.apache.pig.newplan.logical.relational.LOCogroup;
+import org.apache.pig.newplan.logical.relational.LOLimit;
+import org.apache.pig.newplan.logical.expression.ConstantExpression;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LODistinct;
+import org.apache.pig.newplan.logical.expression.DivideExpression;
+import org.apache.pig.newplan.logical.expression.EqualExpression;
+import org.apache.pig.newplan.logical.relational.LOFilter;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.expression.GreaterThanExpression;
+import org.apache.pig.newplan.logical.expression.GreaterThanEqualExpression;
+import org.apache.pig.newplan.logical.expression.LessThanExpression;
+import org.apache.pig.newplan.logical.expression.LessThanEqualExpression;
+import org.apache.pig.newplan.logical.relational.LOLoad;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.logical.expression.ModExpression;
+import org.apache.pig.newplan.logical.expression.LogicalExpression;
+import org.apache.pig.newplan.logical.expression.MultiplyExpression;
+import org.apache.pig.newplan.logical.expression.NotExpression;
+import org.apache.pig.newplan.logical.expression.NotEqualExpression;
+import org.apache.pig.newplan.logical.expression.OrExpression;
+import org.apache.pig.newplan.logical.expression.ProjectExpression;
+import org.apache.pig.newplan.logical.expression.RegexExpression;
+import org.apache.pig.newplan.logical.relational.LOSort;
+import org.apache.pig.newplan.logical.relational.LOSplit;
+import org.apache.pig.newplan.logical.relational.LOStore;
+import org.apache.pig.newplan.logical.expression.SubtractExpression;
+import org.apache.pig.newplan.logical.relational.LOUnion;
+import org.apache.pig.newplan.OperatorPlan;
+import org.apache.pig.newplan.Operator;
+import org.apache.pig.newplan.logical.relational.LogicalSchema;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalNodesVisitor;
 import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
+import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.impl.io.FileSpec;
 
 //This is used to generate synthetic data
 //Synthetic data generation is done by making constraint tuples for each operator as we traverse the plan
 //and try to replace the constraints with values as far as possible. We only deal with simple conditions right now
 
-public class AugmentBaseDataVisitor extends LOVisitor {
+public class AugmentBaseDataVisitor extends LogicalRelationalNodesVisitor {
 
     Map<LOLoad, DataBag> baseData = null;
     Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
-    Map<LogicalOperator, DataBag> derivedData = null;
+    Map<Operator, DataBag> derivedData = null;
     private boolean limit = false;
-    private final Map<LogicalOperator, PhysicalOperator> logToPhysMap;
+    private final Map<Operator, PhysicalOperator> logToPhysMap;
     private Map<LOLimit, Long> oriLimitMap;
 
-    Map<LogicalOperator, DataBag> outputConstraintsMap = new HashMap<LogicalOperator, DataBag>();
+    Map<Operator, DataBag> outputConstraintsMap = new HashMap<Operator, DataBag>();
 
     Log log = LogFactory.getLog(getClass());
 
     // Augmentation moves from the leaves to root and hence needs a
     // depthfirstwalker
-    public AugmentBaseDataVisitor(LogicalPlan plan,
-            Map<LogicalOperator, PhysicalOperator> logToPhysMap,
+    public AugmentBaseDataVisitor(OperatorPlan plan,
+            Map<Operator, PhysicalOperator> logToPhysMap,
             Map<LOLoad, DataBag> baseData,
-            Map<LogicalOperator, DataBag> derivedData) {
-        super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
+            Map<Operator, DataBag> derivedData) throws FrontendException {
+        super(plan, new PreOrderDepthFirstWalker(
                 plan));
         this.baseData = baseData;
         this.derivedData = derivedData;
@@ -112,7 +117,43 @@ public class AugmentBaseDataVisitor exte
         limit = true;
     }
     
-    public Map<LOLoad, DataBag> getNewBaseData() {
+    public Map<LOLoad, DataBag> getNewBaseData() throws ExecException {
+        // consolidate base data from different LOADs on the same inputs
+        MultiMap<FileSpec, DataBag> inputDataMap = new MultiMap<FileSpec, DataBag>();
+        for (Map.Entry<LOLoad, DataBag> e : newBaseData.entrySet()) {
+            inputDataMap.put(e.getKey().getFileSpec(), e.getValue());
+        }
+        
+        int index = 0;
+        for (FileSpec fs : inputDataMap.keySet()) {
+            int maxSchemaSize = 0;
+            Tuple tupleOfMaxSchemaSize = null;
+            for (DataBag bag : inputDataMap.get(fs)) {
+                if (bag.size() > 0) {
+                    int size = 0;
+                    Tuple t = null;
+                    t = bag.iterator().next();
+                    size = t.size();
+                    if (size > maxSchemaSize) {
+                        maxSchemaSize = size;
+                        tupleOfMaxSchemaSize = t;
+                    }
+                }
+            }
+            for (DataBag bag : inputDataMap.get(fs)) {
+                if (bag.size() > 0) {
+                    for (Iterator<Tuple> it = bag.iterator(); it.hasNext();) {
+                        Tuple t = it.next();
+                        for (int i = t.size(); i < maxSchemaSize; ++i) {
+                            t.append(tupleOfMaxSchemaSize.get(i));
+                        }
+                    }
+                }
+            }
+            index++;
+        }
+        
+        
         for (Map.Entry<LOLoad, DataBag> e : baseData.entrySet()) {
             DataBag bag = newBaseData.get(e.getKey());
             if (bag == null) {
@@ -129,8 +170,8 @@ public class AugmentBaseDataVisitor exte
     }
     
     @Override
-    protected void visit(LOCogroup cg) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOCogroup cg) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         // we first get the outputconstraints for the current cogroup
         DataBag outputConstraints = outputConstraintsMap.get(cg);
@@ -141,16 +182,14 @@ public class AugmentBaseDataVisitor exte
         List<List<Integer>> groupSpecs = new LinkedList<List<Integer>>();
         int numCols = -1;
 
-        int minGroupSize = (cg.getInputs().size() == 1) ? 1 : 2;
-
-        for (LogicalOperator op : cg.getInputs()) {
-            List<LogicalPlan> groupByPlans = (List<LogicalPlan>) cg
-                    .getGroupByPlans().get(op);
+        for (int index = 0; index < cg.getInputs((LogicalPlan)plan).size(); ++index) {
+            Collection<LogicalExpressionPlan> groupByPlans = (List<LogicalExpressionPlan>) cg
+                    .getExpressionPlans().get(index);
             List<Integer> groupCols = new ArrayList<Integer>();
-            for (LogicalPlan plan : groupByPlans) {
-                LogicalOperator leaf = plan.getLeaves().get(0);
-                if (leaf instanceof LOProject) {
-                    groupCols.add(((LOProject) leaf).getCol());
+            for (LogicalExpressionPlan plan : groupByPlans) {
+                Operator leaf = plan.getSinks().get(0);
+                if (leaf instanceof ProjectExpression) {
+                    groupCols.add(Integer.valueOf(((ProjectExpression) leaf).getColNum()));
                 } else {
                     ableToHandle = false;
                     break;
@@ -173,7 +212,7 @@ public class AugmentBaseDataVisitor exte
         try {
             if (ableToHandle) {
                 // we need to go through the output constraints first
-                int numInputs = cg.getInputs().size();
+                int numInputs = cg.getInputs((LogicalPlan) plan).size();
                 if (outputConstraints != null) {
                     for (Iterator<Tuple> it = outputConstraints.iterator(); it
                             .hasNext();) {
@@ -182,19 +221,19 @@ public class AugmentBaseDataVisitor exte
 
                         for (int input = 0; input < numInputs; input++) {
 
-                            int numInputFields = cg.getInputs().get(input)
+                            int numInputFields = ((LogicalRelationalOperator) cg.getInputs((LogicalPlan) plan).get(input))
                                     .getSchema().size();
                             List<Integer> groupCols = groupSpecs.get(input);
 
                             DataBag output = outputConstraintsMap.get(cg
-                                    .getInputs().get(input));
+                                    .getInputs((LogicalPlan) plan).get(input));
                             if (output == null) {
                                 output = BagFactory.getInstance()
                                         .newDefaultBag();
-                                outputConstraintsMap.put(cg.getInputs().get(
+                                outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(
                                         input), output);
                             }
-                            for (int i = 0; i < minGroupSize; i++) {
+                            for (int i = 0; i < 2; i++) {
                                 Tuple inputConstraint = GetGroupByInput(
                                         groupLabel, groupCols, numInputFields);
                                 if (inputConstraint != null)
@@ -212,18 +251,18 @@ public class AugmentBaseDataVisitor exte
                     Object groupLabel = groupTup.get(0);
 
                     for (int input = 0; input < numInputs; input++) {
-                        int numInputFields = cg.getInputs().get(input)
+                        int numInputFields = ((LogicalRelationalOperator)cg.getInputs((LogicalPlan) plan).get(input))
                                 .getSchema().size();
                         List<Integer> groupCols = groupSpecs.get(input);
 
                         DataBag output = outputConstraintsMap.get(cg
-                                .getInputs().get(input));
+                                .getInputs((LogicalPlan) plan).get(input));
                         if (output == null) {
                             output = BagFactory.getInstance().newDefaultBag();
-                            outputConstraintsMap.put(cg.getInputs().get(input),
+                            outputConstraintsMap.put(cg.getInputs((LogicalPlan) plan).get(input),
                                     output);
                         }
-                        int numTupsToAdd = minGroupSize
+                        int numTupsToAdd = 2
                                 - (int) ((DataBag) groupTup.get(input + 1))
                                         .size();
                         for (int i = 0; i < numTupsToAdd; i++) {
@@ -239,29 +278,29 @@ public class AugmentBaseDataVisitor exte
             log
                     .error("Error visiting Cogroup during Augmentation phase of Example Generator! "
                             + e.getMessage());
-            throw new VisitorException(
+            throw new FrontendException(
                     "Error visiting Cogroup during Augmentation phase of Example Generator! "
                             + e.getMessage());
         }
     }
 
     @Override
-    protected void visit(LOCross cs) throws VisitorException {
+    public void visit(LOCross cs) throws FrontendException {
 
     }
 
     @Override
-    protected void visit(LODistinct dt) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LODistinct dt) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
     
         DataBag outputConstraints = outputConstraintsMap.get(dt);
         outputConstraintsMap.remove(dt);
 
-        DataBag inputConstraints = outputConstraintsMap.get(dt.getInput());
+        DataBag inputConstraints = outputConstraintsMap.get(dt.getInput((LogicalPlan) plan));
         if (inputConstraints == null) {
             inputConstraints = BagFactory.getInstance().newDefaultBag();
-            outputConstraintsMap.put(dt.getInput(), inputConstraints);
+            outputConstraintsMap.put(dt.getInput((LogicalPlan) plan), inputConstraints);
         }
     
         if (outputConstraints != null && outputConstraints.size() > 0) {
@@ -273,7 +312,7 @@ public class AugmentBaseDataVisitor exte
         
         boolean emptyInputConstraints = inputConstraints.size() == 0;
         if (emptyInputConstraints) {
-            DataBag inputData = derivedData.get(dt.getInput());
+            DataBag inputData = derivedData.get(dt.getInput((LogicalPlan) plan));
             for (Iterator<Tuple> it = inputData.iterator(); it.hasNext();)
             {
                 inputConstraints.add(it.next());
@@ -300,22 +339,22 @@ public class AugmentBaseDataVisitor exte
     }
 
     @Override
-    protected void visit(LOFilter filter) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOFilter filter) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         
         DataBag outputConstraints = outputConstraintsMap.get(filter);
         outputConstraintsMap.remove(filter);
 
-        LogicalPlan filterCond = filter.getComparisonPlan();
-        DataBag inputConstraints = outputConstraintsMap.get(filter.getInput());
+        LogicalExpressionPlan filterCond = filter.getFilterPlan();
+        DataBag inputConstraints = outputConstraintsMap.get(filter.getInput((LogicalPlan) plan));
         if (inputConstraints == null) {
             inputConstraints = BagFactory.getInstance().newDefaultBag();
-            outputConstraintsMap.put(filter.getInput(), inputConstraints);
+            outputConstraintsMap.put(filter.getInput((LogicalPlan) plan), inputConstraints);
         }
 
         DataBag outputData = derivedData.get(filter);
-        DataBag inputData = derivedData.get(filter.getInput());
+        DataBag inputData = derivedData.get(filter.getInput((LogicalPlan) plan));
         try {
             if (outputConstraints != null && outputConstraints.size() > 0) { // there
                 // 's
@@ -365,19 +404,19 @@ public class AugmentBaseDataVisitor exte
             log
                     .error("Error visiting Load during Augmentation phase of Example Generator! "
                             + e.getMessage());
-            throw new VisitorException(
+            throw new FrontendException(
                     "Error visiting Load during Augmentation phase of Example Generator! "
                             + e.getMessage());
         }
     }
 
     @Override
-    protected void visit(LOForEach forEach) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOForEach forEach) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         DataBag outputConstraints = outputConstraintsMap.get(forEach);
         outputConstraintsMap.remove(forEach);
-        List<LogicalPlan> plans = forEach.getForEachPlans();
+        LogicalPlan plan = forEach.getInnerPlan();
         boolean ableToHandle = true;
         List<Integer> cols = new ArrayList<Integer>();
         boolean cast = false;
@@ -386,20 +425,18 @@ public class AugmentBaseDataVisitor exte
             // we dont have to do anything in this case
             return;
 
-        for (LogicalPlan plan : plans) {
-            LogicalOperator op = plan.getLeaves().get(0);
-            if (op instanceof LOCast) {
+
+        Operator op = plan.getSinks().get(0);
+        if (op instanceof CastExpression) {
                 cast = true;
-                op = ((LOCast) op).getExpression();
+                op = ((CastExpression) op).getExpression();
             }
 
-            if (!(op instanceof LOProject)) {
+            if (!(op instanceof ProjectExpression)) {
                 ableToHandle = false;
-                break;
             } else {
-                cols.add(((LOProject) op).getCol());
+                cols.add(Integer.valueOf(((ProjectExpression) op).getColNum()));
             }
-        }
 
         if (ableToHandle) {
             // we can only handle simple projections
@@ -409,26 +446,31 @@ public class AugmentBaseDataVisitor exte
                 Tuple outputConstraint = it.next();
                 try {
                     Tuple inputConstraint = BackPropConstraint(
-                            outputConstraint, cols, (forEach.getPlan()
-                                    .getPredecessors(forEach)).get(0)
+                            outputConstraint, cols, ((LogicalRelationalOperator)plan
+                                    .getPredecessors(forEach).get(0))
                                     .getSchema(), cast);
                     output.add(inputConstraint);
                 } catch (Exception e) {
                     e.printStackTrace();
-                    throw new VisitorException(
+                    throw new FrontendException(
                             "Operator error during Augmenting Phase in Example Generator "
                                     + e.getMessage());
                 }
             }
-            outputConstraintsMap.put(forEach.getPlan().getPredecessors(forEach)
+            outputConstraintsMap.put(plan.getPredecessors(forEach)
                     .get(0), output);
         }
 
     }
 
     @Override
-    protected void visit(LOLoad load) throws VisitorException {
+    public void visit(LOLoad load) throws FrontendException {
         DataBag inputData = baseData.get(load);
+       // check if the inputData exists
+        if (inputData == null || inputData.size() == 0) {
+            log.error("No (valid) input data found!");
+            throw new RuntimeException("No (valid) input data found!");
+        }
 
         DataBag newInputData = newBaseData.get(load);
         if (newInputData == null) {
@@ -436,7 +478,7 @@ public class AugmentBaseDataVisitor exte
             newBaseData.put(load, newInputData);
         }
 
-        Schema schema;
+        LogicalSchema schema;
         try {
             schema = load.getSchema();
             if (schema == null)
@@ -447,17 +489,15 @@ public class AugmentBaseDataVisitor exte
             log
                     .error("Error visiting Load during Augmentation phase of Example Generator! "
                             + e.getMessage());
-            throw new VisitorException(
+            throw new FrontendException(
                     "Error visiting Load during Augmentation phase of Example Generator! "
                             + e.getMessage());
         }
+        
+        Tuple exampleTuple = inputData.iterator().next();
+        
         DataBag outputConstraints = outputConstraintsMap.get(load);
         outputConstraintsMap.remove(load);
-        // check if the inputData exists
-        if (inputData == null || inputData.size() == 0) {
-            log.error("No (valid) input data found!");
-            throw new RuntimeException("No (valid) input data found!");
-        }
 
         // first of all, we are required to guarantee that there is at least one
         // output tuple
@@ -469,7 +509,6 @@ public class AugmentBaseDataVisitor exte
 
         // create example tuple to steal values from when we encounter
         // "don't care" fields (i.e. null fields)
-        Tuple exampleTuple = inputData.iterator().next();
         System.out.println(exampleTuple.toString());
 
         // run through output constraints; for each one synthesize a tuple and
@@ -505,7 +544,7 @@ public class AugmentBaseDataVisitor exte
                 log
                         .error("Error visiting Load during Augmentation phase of Example Generator! "
                                 + e.getMessage());
-                throw new VisitorException(
+                throw new FrontendException(
                         "Error visiting Load during Augmentation phase of Example Generator! "
                                 + e.getMessage());
 
@@ -521,24 +560,14 @@ public class AugmentBaseDataVisitor exte
                         newInput = true;
                 }
             } catch (ExecException e) {
-                throw new VisitorException(
+                throw new FrontendException(
                   "Error visiting Load during Augmentation phase of Example Generator! "
                           + e.getMessage());
             }
         }
-        
-        if (newInput) {
-            for (Map.Entry<LOLoad, DataBag> entry : newBaseData.entrySet()) {
-                LOLoad otherLoad = entry.getKey();
-                if (otherLoad != load && otherLoad.getInputFile().equals(load.getInputFile())) {
-                    // different load sharing the same input file
-                    entry.getValue().addAll(newInputData);
-                }
-            }
-        }
     }
 
-    private boolean inInput(Tuple newTuple, DataBag input, Schema schema) throws ExecException {
+    private boolean inInput(Tuple newTuple, DataBag input, LogicalSchema schema) throws ExecException {
         boolean result;
         for (Iterator<Tuple> iter = input.iterator(); iter.hasNext();) {
             result = true;
@@ -556,43 +585,43 @@ public class AugmentBaseDataVisitor exte
     }
     
     @Override
-    protected void visit(LOSort s) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOSort s) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         DataBag outputConstraints = outputConstraintsMap.get(s);
         outputConstraintsMap.remove(s);
 
         if (outputConstraints == null)
-            outputConstraintsMap.put(s.getInput(), BagFactory.getInstance()
+            outputConstraintsMap.put(s.getInput((LogicalPlan) plan), BagFactory.getInstance()
                     .newDefaultBag());
         else
-            outputConstraintsMap.put(s.getInput(), outputConstraints);
+            outputConstraintsMap.put(s.getInput((LogicalPlan) plan), outputConstraints);
     }
 
     @Override
-    protected void visit(LOSplit split) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOSplit split) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
           return;
     }
 
     @Override
-    protected void visit(LOStore store) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOStore store) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         DataBag outputConstraints = outputConstraintsMap.get(store);
         if (outputConstraints == null) {
-            outputConstraintsMap.put(store.getPlan().getPredecessors(store)
+            outputConstraintsMap.put(plan.getPredecessors(store)
                     .get(0), BagFactory.getInstance().newDefaultBag());
         } else {
             outputConstraintsMap.remove(store);
-            outputConstraintsMap.put(store.getPlan().getPredecessors(store)
+            outputConstraintsMap.put(plan.getPredecessors(store)
                     .get(0), outputConstraints);
         }
     }
 
     @Override
-    protected void visit(LOUnion u) throws VisitorException {
-        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+    public void visit(LOUnion u) throws FrontendException {
+        if (limit && !((PreOrderDepthFirstWalker) currentWalker).getBranchFlag())
             return;
         DataBag outputConstraints = outputConstraintsMap.get(u);
         outputConstraintsMap.remove(u);
@@ -600,7 +629,7 @@ public class AugmentBaseDataVisitor exte
             // we dont need to do anything
             // we just find the inputs, create empty bags as their
             // outputConstraints and return
-            for (LogicalOperator op : u.getInputs()) {
+            for (Operator op : u.getInputs((LogicalPlan) plan)) {
                 DataBag constraints = BagFactory.getInstance().newDefaultBag();
                 outputConstraintsMap.put(op, constraints);
             }
@@ -610,10 +639,10 @@ public class AugmentBaseDataVisitor exte
         // since we have some outputConstraints, we apply them to the inputs
         // round-robin
         int count = 0;
-        List<LogicalOperator> inputs = u.getInputs();
+        List<Operator> inputs = u.getInputs(((LogicalPlan) plan));
         int noInputs = inputs.size();
 
-        for (LogicalOperator op : inputs) {
+        for (Operator op : inputs) {
             DataBag constraint = BagFactory.getInstance().newDefaultBag();
             outputConstraintsMap.put(op, constraint);
         }
@@ -626,7 +655,7 @@ public class AugmentBaseDataVisitor exte
     }
 
     @Override
-    protected void visit(LOLimit lm) throws VisitorException {
+    public void visit(LOLimit lm) throws FrontendException {
         if (!limit) // not augment for LIMIT in this traversal
             return;
         
@@ -636,13 +665,13 @@ public class AugmentBaseDataVisitor exte
         DataBag outputConstraints = outputConstraintsMap.get(lm);
         outputConstraintsMap.remove(lm);
 
-        DataBag inputConstraints = outputConstraintsMap.get(lm.getInput());
+        DataBag inputConstraints = outputConstraintsMap.get(lm.getInput((LogicalPlan) plan));
         if (inputConstraints == null) {
             inputConstraints = BagFactory.getInstance().newDefaultBag();
-            outputConstraintsMap.put(lm.getInput(), inputConstraints);
+            outputConstraintsMap.put(lm.getInput((LogicalPlan) plan), inputConstraints);
         }
 
-        DataBag inputData = derivedData.get(lm.getInput());
+        DataBag inputData = derivedData.get(lm.getInput((LogicalPlan) plan));
         
         if (outputConstraints != null && outputConstraints.size() > 0) { // there
             // 's
@@ -662,7 +691,7 @@ public class AugmentBaseDataVisitor exte
              // ... plus one more if only one
              if (inputConstraints.size() == 1) {
                 inputConstraints.add(inputData.iterator().next());
-                ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+                ((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
              }
           }
         } else if (inputConstraints.size() == 0){
@@ -671,7 +700,7 @@ public class AugmentBaseDataVisitor exte
             // ... plus one more if only one
             if (inputConstraints.size() == 1) {
                 inputConstraints.add(inputData.iterator().next());
-                ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+                ((PreOrderDepthFirstWalker) currentWalker).setBranchFlag();
             }
         }
         POLimit poLimit = (POLimit) logToPhysMap.get(lm);
@@ -700,7 +729,7 @@ public class AugmentBaseDataVisitor exte
     }
 
     Tuple BackPropConstraint(Tuple outputConstraint, List<Integer> cols,
-            Schema inputSchema, boolean cast) throws ExecException {
+            LogicalSchema inputSchema, boolean cast) throws ExecException {
         Tuple inputConst = TupleFactory.getInstance().newTuple(
                 inputSchema.getFields().size());
 
@@ -732,8 +761,8 @@ public class AugmentBaseDataVisitor exte
     // predicate
     // (or null if unable to find such a tuple)
 
-    ExampleTuple GenerateMatchingTuple(Schema schema, LogicalPlan plan,
-            boolean invert) throws ExecException {
+    ExampleTuple GenerateMatchingTuple(LogicalSchema schema, LogicalExpressionPlan plan,
+            boolean invert) throws FrontendException, ExecException {
         return GenerateMatchingTuple(TupleFactory.getInstance().newTuple(
                 schema.getFields().size()), plan, invert);
     }
@@ -753,39 +782,39 @@ public class AugmentBaseDataVisitor exte
     // what predicate it wants satisfied in a given field)
     //
 
-    ExampleTuple GenerateMatchingTuple(Tuple constraint, LogicalPlan predicate,
-            boolean invert) throws ExecException {
+    ExampleTuple GenerateMatchingTuple(Tuple constraint, LogicalExpressionPlan predicate,
+            boolean invert) throws ExecException, FrontendException {
         Tuple t = TupleFactory.getInstance().newTuple(constraint.size());
         ExampleTuple tOut = new ExampleTuple(t);
         for (int i = 0; i < t.size(); i++)
             tOut.set(i, constraint.get(i));
 
-        GenerateMatchingTupleHelper(tOut, (ExpressionOperator) predicate
-                .getLeaves().get(0), invert);
+        GenerateMatchingTupleHelper(tOut, predicate
+                .getSources().get(0), invert);
         tOut.synthetic = true;
         return tOut;
 
     }
 
-    void GenerateMatchingTupleHelper(Tuple t, ExpressionOperator pred,
-            boolean invert) throws ExecException {
-        if (pred instanceof BinaryExpressionOperator)
-            GenerateMatchingTupleHelper(t, (BinaryExpressionOperator) pred,
+    void GenerateMatchingTupleHelper(Tuple t, Operator pred,
+            boolean invert) throws FrontendException, ExecException {
+        if (pred instanceof BinaryExpression)
+            GenerateMatchingTupleHelper(t, (BinaryExpression) pred,
                     invert);
-        else if (pred instanceof LONot)
-            GenerateMatchingTupleHelper(t, (LONot) pred, invert);
+        else if (pred instanceof NotExpression)
+            GenerateMatchingTupleHelper(t, (NotExpression) pred, invert);
         else
-            throw new ExecException("Unknown operator in filter predicate");
+            throw new FrontendException("Unknown operator in filter predicate");
     }
 
-    void GenerateMatchingTupleHelper(Tuple t, BinaryExpressionOperator pred,
-            boolean invert) throws ExecException {
+    void GenerateMatchingTupleHelper(Tuple t, BinaryExpression pred,
+            boolean invert) throws FrontendException, ExecException {
 
-        if (pred instanceof LOAnd) {
-            GenerateMatchingTupleHelper(t, (LOAnd) pred, invert);
+        if (pred instanceof AndExpression) {
+            GenerateMatchingTupleHelper(t, (AndExpression) pred, invert);
             return;
-        } else if (pred instanceof LOOr) {
-            GenerateMatchingTupleHelper(t, (LOOr) pred, invert);
+        } else if (pred instanceof OrExpression) {
+            GenerateMatchingTupleHelper(t, (OrExpression) pred, invert);
             return;
         }
 
@@ -798,27 +827,26 @@ public class AugmentBaseDataVisitor exte
 
         int leftCol = -1, rightCol = -1;
 
-        if (pred instanceof LOAdd || pred instanceof LOSubtract
-                || pred instanceof LOMultiply || pred instanceof LODivide
-                || pred instanceof LOMod || pred instanceof LORegexp)
+        if (pred instanceof AddExpression || pred instanceof SubtractExpression
+                || pred instanceof MultiplyExpression || pred instanceof DivideExpression
+                || pred instanceof ModExpression || pred instanceof RegexExpression)
             return; // We don't try to work around these operators right now
 
-        if (pred.getLhsOperand() instanceof LOConst) {
+        if (pred.getLhs() instanceof ConstantExpression) {
             leftIsConst = true;
-            leftConst = ((LOConst) (pred.getLhsOperand())).getValue();
+            leftConst = ((ConstantExpression) (pred.getLhs())).getValue();
         } else {
-            LogicalOperator lhs = pred.getLhsOperand();
-            if (lhs instanceof LOCast)
-                lhs = ((LOCast) lhs).getExpression();
-            // if (!(pred.getLhsOperand() instanceof LOProject && ((LOProject)
+            LogicalExpression lhs = pred.getLhs();
+            if (lhs instanceof CastExpression)
+                lhs = ((CastExpression) lhs).getExpression();
+            // if (!(pred.getLhsOperand() instanceof ProjectExpression && ((ProjectExpression)
             // pred
             // .getLhsOperand()).getProjection().size() == 1))
             // return; // too hard
-            if (!(lhs instanceof LOProject && ((LOProject) lhs).getProjection()
-                    .size() == 1))
+            if (!(lhs instanceof ProjectExpression))
                 return;
-            leftCol = ((LOProject) lhs).getCol();
-            leftDataType = ((LOProject) lhs).getType();
+            leftCol = ((ProjectExpression) lhs).getColNum();
+            leftDataType = ((ProjectExpression) lhs).getType();
 
             Object d = t.get(leftCol);
             if (d != null) {
@@ -827,22 +855,21 @@ public class AugmentBaseDataVisitor exte
             }
         }
 
-        if (pred.getRhsOperand() instanceof LOConst) {
+        if (pred.getRhs() instanceof ConstantExpression) {
             rightIsConst = true;
-            rightConst = ((LOConst) (pred.getRhsOperand())).getValue();
+            rightConst = ((ConstantExpression) (pred.getRhs())).getValue();
         } else {
-            LogicalOperator rhs = pred.getRhsOperand();
-            if (rhs instanceof LOCast)
-                rhs = ((LOCast) rhs).getExpression();
-            // if (!(pred.getRhsOperand() instanceof LOProject && ((LOProject)
+            Operator rhs = pred.getRhs();
+            if (rhs instanceof CastExpression)
+                rhs = ((CastExpression) rhs).getExpression();
+            // if (!(pred.getRhsOperand() instanceof ProjectExpression && ((ProjectExpression)
             // pred
             // .getRhsOperand()).getProjection().size() == 1))
             // return; // too hard
-            if (!(rhs instanceof LOProject && ((LOProject) rhs).getProjection()
-                    .size() == 1))
+            if (!(rhs instanceof ProjectExpression))
                 return;
-            rightCol = ((LOProject) rhs).getCol();
-            rightDataType = ((LOProject) rhs).getType();
+            rightCol = ((ProjectExpression) rhs).getColNum();
+            rightDataType = ((ProjectExpression) rhs).getType();
 
             Object d = t.get(rightCol);
             if (d != null) {
@@ -858,7 +885,7 @@ public class AugmentBaseDataVisitor exte
 
         // convert some nulls to constants
         if (!invert) {
-            if (pred instanceof LOEqual) {
+            if (pred instanceof EqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType, leftConst
                             .toString()));
@@ -869,7 +896,7 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "0"));
                     t.set(rightCol, generateData(rightDataType, "0"));
                 }
-            } else if (pred instanceof LONotEqual) {
+            } else if (pred instanceof NotEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType,
                             GetUnequalValue(leftConst).toString()));
@@ -880,8 +907,8 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "0"));
                     t.set(rightCol, generateData(rightDataType, "1"));
                 }
-            } else if (pred instanceof LOGreaterThan
-                    || pred instanceof LOGreaterThanEqual) {
+            } else if (pred instanceof GreaterThanExpression
+                    || pred instanceof GreaterThanEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType,
                             GetSmallerValue(leftConst).toString()));
@@ -892,8 +919,8 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "1"));
                     t.set(rightCol, generateData(rightDataType, "0"));
                 }
-            } else if (pred instanceof LOLesserThan
-                    || pred instanceof LOLesserThanEqual) {
+            } else if (pred instanceof LessThanExpression
+                    || pred instanceof LessThanEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType, GetLargerValue(
                             leftConst).toString()));
@@ -906,7 +933,7 @@ public class AugmentBaseDataVisitor exte
                 }
             }
         } else {
-            if (pred instanceof LOEqual) {
+            if (pred instanceof EqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType,
                             GetUnequalValue(leftConst).toString()));
@@ -917,7 +944,7 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "0"));
                     t.set(rightCol, generateData(rightDataType, "1"));
                 }
-            } else if (pred instanceof LONotEqual) {
+            } else if (pred instanceof NotEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType, leftConst
                             .toString()));
@@ -928,8 +955,8 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "0"));
                     t.set(rightCol, generateData(rightDataType, "0"));
                 }
-            } else if (pred instanceof LOGreaterThan
-                    || pred instanceof LOGreaterThanEqual) {
+            } else if (pred instanceof GreaterThanExpression
+                    || pred instanceof GreaterThanEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType, GetLargerValue(
                             leftConst).toString()));
@@ -940,8 +967,8 @@ public class AugmentBaseDataVisitor exte
                     t.set(leftCol, generateData(leftDataType, "0"));
                     t.set(rightCol, generateData(rightDataType, "1"));
                 }
-            } else if (pred instanceof LOLesserThan
-                    || pred instanceof LOLesserThanEqual) {
+            } else if (pred instanceof LessThanExpression
+                    || pred instanceof LessThanEqualExpression) {
                 if (leftIsConst) {
                     t.set(rightCol, generateData(rightDataType,
                             GetSmallerValue(leftConst).toString()));
@@ -957,27 +984,27 @@ public class AugmentBaseDataVisitor exte
 
     }
 
-    void GenerateMatchingTupleHelper(Tuple t, LOAnd op, boolean invert)
-            throws ExecException {
-        ExpressionOperator input = op.getLhsOperand();
+    void GenerateMatchingTupleHelper(Tuple t, AndExpression op, boolean invert)
+            throws FrontendException, ExecException {
+        Operator input = op.getLhs();
         GenerateMatchingTupleHelper(t, input, invert);
-        input = op.getRhsOperand();
+        input = op.getRhs();
         GenerateMatchingTupleHelper(t, input, invert);
 
     }
 
-    void GenerateMatchingTupleHelper(Tuple t, LOOr op, boolean invert)
-            throws ExecException {
-        ExpressionOperator input = op.getLhsOperand();
+    void GenerateMatchingTupleHelper(Tuple t, OrExpression op, boolean invert)
+            throws FrontendException, ExecException {
+        Operator input = op.getLhs();
         GenerateMatchingTupleHelper(t, input, invert);
-        input = op.getRhsOperand();
+        input = op.getRhs();
         GenerateMatchingTupleHelper(t, input, invert);
 
     }
 
-    void GenerateMatchingTupleHelper(Tuple t, LONot op, boolean invert)
-            throws ExecException {
-        ExpressionOperator input = op.getOperand();
+    void GenerateMatchingTupleHelper(Tuple t, NotExpression op, boolean invert)
+            throws FrontendException, ExecException {
+        LogicalExpression input = op.getExpression();
         GenerateMatchingTupleHelper(t, input, !invert);
 
     }

Modified: pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java Fri Dec 17 22:08:08 2010
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.pig.pen;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Random;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORead;
-import org.apache.pig.data.BagFactory;
-import org.apache.pig.data.DataBag;
-import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.PigContext;
-import org.apache.pig.impl.logicalLayer.FrontendException;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LODistinct;
-import org.apache.pig.impl.logicalLayer.LOFilter;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOLimit;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOStore;
-import org.apache.pig.impl.logicalLayer.LOUnion;
-import org.apache.pig.impl.logicalLayer.LOVisitor;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
-import org.apache.pig.impl.plan.DependencyOrderWalker;
-import org.apache.pig.impl.plan.OperatorKey;
-import org.apache.pig.impl.plan.PlanException;
-import org.apache.pig.impl.plan.VisitorException;
-import org.apache.pig.impl.util.IdentityHashSet;
-import org.apache.pig.pen.util.DependencyOrderLimitedWalker;
-import org.apache.pig.pen.util.LineageTracer;
-
-
-//This class is used to pass data through the entire plan and save the intermediates results.
-public class DerivedDataVisitor {
-
-    Map<LogicalOperator, DataBag> derivedData = new HashMap<LogicalOperator, DataBag>();
-    PhysicalPlan physPlan = null;
-    Map<LOLoad, DataBag> baseData = null;
-
-    Map<LogicalOperator, PhysicalOperator> LogToPhyMap = null;
-    Log log = LogFactory.getLog(getClass());
-
-    Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> OpToEqClasses = null;
-    Collection<IdentityHashSet<Tuple>> EqClasses = null;
-
-    LineageTracer lineage = new LineageTracer();
-
-    public DerivedDataVisitor(LogicalPlan plan, PigContext pc,
-            Map<LOLoad, DataBag> baseData,
-            PhysicalPlan physPlan) {
-
-        this.baseData = baseData;
-
-        OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
-        EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
-        this.physPlan = physPlan;
-        // if(logToPhyMap == null)
-        // compilePlan(plan);
-        // else
-        // LogToPhyMap = logToPhyMap;
-
-    }
-
-    public DerivedDataVisitor(LogicalOperator op, PigContext pc,
-            Map<LOLoad, DataBag> baseData,
-            Map<LogicalOperator, PhysicalOperator> logToPhyMap,
-            PhysicalPlan physPlan) {
-        this.baseData = baseData;
-
-        OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
-        EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
-        LogToPhyMap = logToPhyMap;
-        this.physPlan = physPlan;
-    }
-}

Modified: pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java?rev=1050503&r1=1050502&r2=1050503&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java (original)
+++ pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java Fri Dec 17 22:08:08 2010
@@ -29,56 +29,56 @@ import java.util.Iterator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOCross;
-import org.apache.pig.impl.logicalLayer.LogicalOperator;
-import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.newplan.logical.relational.LOForEach;
+import org.apache.pig.newplan.logical.relational.LOCross;
+import org.apache.pig.newplan.logical.relational.LogicalRelationalOperator;
+import org.apache.pig.newplan.logical.relational.LogicalPlan;
+import org.apache.pig.newplan.Operator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
-import org.apache.pig.impl.plan.VisitorException;
 
 
 //These methods are used to generate equivalence classes given the operator name and the output from the operator
 //For example, it gives out 2 eq. classes for filter, one that passes the filter and one that doesn't
 public class EquivalenceClasses {
     
-    public static Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
-        LogicalPlan lp, Map<LogicalOperator, PhysicalOperator> logToPhyMap,
-        Map<LogicalOperator, DataBag> logToDataMap,
-        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+    public static Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
+        LogicalPlan lp, Map<Operator, PhysicalOperator> logToPhyMap,
+        Map<Operator, DataBag> logToDataMap,
+        Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
         final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap)
-        throws VisitorException {
-        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
-          new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
-        List<LogicalOperator> roots = lp.getRoots();
-        HashSet<LogicalOperator> seen = new HashSet<LogicalOperator>();
-        for(LogicalOperator lo: roots) {
+    {
+        Map<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+          new HashMap<LogicalRelationalOperator, Collection<IdentityHashSet<Tuple>>>();
+        List<Operator> roots = lp.getSources();
+        HashSet<Operator> seen = new HashSet<Operator>();
+        for(Operator lo: roots) {
             getEqClasses(plan, lo, lp, logToPhyMap, ret, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
         }
         return ret;
     }
     
-    private static void getEqClasses(PhysicalPlan plan, LogicalOperator parent, LogicalPlan lp,
-        Map<LogicalOperator, PhysicalOperator> logToPhyMap, Map<LogicalOperator,
+    private static void getEqClasses(PhysicalPlan plan, Operator parent, LogicalPlan lp,
+        Map<Operator, PhysicalOperator> logToPhyMap, Map<LogicalRelationalOperator,
         Collection<IdentityHashSet<Tuple>>> result,
         final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap,
-        Map<LogicalOperator, DataBag> logToDataMap,
-        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
-        HashSet<LogicalOperator> seen) throws VisitorException {
+        Map<Operator, DataBag> logToDataMap,
+        Map<LOForEach, Map<LogicalRelationalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+        HashSet<Operator> seen) {
         if (parent instanceof LOForEach) {
             if (poToEqclassesMap.get(logToPhyMap.get(parent)) != null) {
                 LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
                 eqClasses.addAll(poToEqclassesMap.get(logToPhyMap.get(parent)));
-                for (Map.Entry<LogicalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
+                for (Map.Entry<LogicalRelationalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
                     if (poToEqclassesMap.get(entry.getValue()) != null)
                         eqClasses.addAll(poToEqclassesMap.get(entry.getValue()));
                 }
-                result.put(parent, eqClasses);
+                result.put((LogicalRelationalOperator) parent, eqClasses);
             }
         } else if (parent instanceof LOCross) {
             boolean ok = true; 
-            for (LogicalOperator input : ((LOCross) parent).getInputs()) {
+            for (Operator input : ((LOCross) parent).getInputs()) {
                 if (logToDataMap.get(input).size() < 2) {
                     // only if all inputs have at least more than two tuples will all outputs be added to the eq. class
                     ok = false;
@@ -92,12 +92,12 @@ public class EquivalenceClasses {
                     eqClass.add(it.next());
                 }
                 eqClasses.add(eqClass);
-                result.put(parent, eqClasses);
+                result.put((LogicalRelationalOperator) parent, eqClasses);
             } else {
                 LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
                 IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
                 eqClasses.add(eqClass);
-                result.put(parent, eqClasses);
+                result.put((LogicalRelationalOperator)parent, eqClasses);
             }
         } else {
             Collection<IdentityHashSet<Tuple>> eqClasses = poToEqclassesMap.get(logToPhyMap.get(parent));
@@ -108,11 +108,11 @@ public class EquivalenceClasses {
                     eqClasses.add(new IdentityHashSet<Tuple>());
                 }
             }
-            result.put(parent, eqClasses);
+            result.put((LogicalRelationalOperator)parent, eqClasses);
         }
         // result.put(parent, getEquivalenceClasses(plan, parent, lp, logToPhyMap, poToEqclassesMap));
         if (lp.getSuccessors(parent) != null) {
-            for (LogicalOperator lo : lp.getSuccessors(parent)) {
+            for (Operator lo : lp.getSuccessors(parent)) {
                 if (!seen.contains(lo)) {
                     seen.add(lo);
                     getEqClasses(plan, lo, lp, logToPhyMap, result, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);