You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC

svn commit: r1045314 [3/5] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...

Modified: pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/AugmentBaseDataVisitor.java Mon Dec 13 19:11:00 2010
@@ -24,10 +24,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.HashSet;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataByteArray;
@@ -41,6 +44,7 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LOAnd;
 import org.apache.pig.impl.logicalLayer.LOCast;
 import org.apache.pig.impl.logicalLayer.LOCogroup;
+import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.logicalLayer.LOConst;
 import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LODistinct;
@@ -69,10 +73,10 @@ import org.apache.pig.impl.logicalLayer.
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.plan.DepthFirstWalker;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
 import org.apache.pig.pen.util.PreOrderDepthFirstWalker;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLimit;
 
 //This is used to generate synthetic data
 //Synthetic data generation is done by making constraint tuples for each operator as we traverse the plan
@@ -83,6 +87,9 @@ public class AugmentBaseDataVisitor exte
     Map<LOLoad, DataBag> baseData = null;
     Map<LOLoad, DataBag> newBaseData = new HashMap<LOLoad, DataBag>();
     Map<LogicalOperator, DataBag> derivedData = null;
+    private boolean limit = false;
+    private final Map<LogicalOperator, PhysicalOperator> logToPhysMap;
+    private Map<LOLimit, Long> oriLimitMap;
 
     Map<LogicalOperator, DataBag> outputConstraintsMap = new HashMap<LogicalOperator, DataBag>();
 
@@ -91,15 +98,20 @@ public class AugmentBaseDataVisitor exte
     // Augmentation moves from the leaves to root and hence needs a
     // depthfirstwalker
     public AugmentBaseDataVisitor(LogicalPlan plan,
+            Map<LogicalOperator, PhysicalOperator> logToPhysMap,
             Map<LOLoad, DataBag> baseData,
             Map<LogicalOperator, DataBag> derivedData) {
         super(plan, new PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>(
                 plan));
         this.baseData = baseData;
         this.derivedData = derivedData;
-
+        this.logToPhysMap = logToPhysMap;
     }
 
+    public void setLimit() {
+        limit = true;
+    }
+    
     public Map<LOLoad, DataBag> getNewBaseData() {
         for (Map.Entry<LOLoad, DataBag> e : baseData.entrySet()) {
             DataBag bag = newBaseData.get(e.getKey());
@@ -112,8 +124,14 @@ public class AugmentBaseDataVisitor exte
         return newBaseData;
     }
 
+    public Map<LOLimit, Long> getOriLimitMap() {
+        return oriLimitMap;
+    }
+    
     @Override
     protected void visit(LOCogroup cg) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
         // we first get the outputconstraints for the current cogroup
         DataBag outputConstraints = outputConstraintsMap.get(cg);
         outputConstraintsMap.remove(cg);
@@ -234,11 +252,58 @@ public class AugmentBaseDataVisitor exte
 
     @Override
     protected void visit(LODistinct dt) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
+    
+        DataBag outputConstraints = outputConstraintsMap.get(dt);
+        outputConstraintsMap.remove(dt);
 
+        DataBag inputConstraints = outputConstraintsMap.get(dt.getInput());
+        if (inputConstraints == null) {
+            inputConstraints = BagFactory.getInstance().newDefaultBag();
+            outputConstraintsMap.put(dt.getInput(), inputConstraints);
+        }
+    
+        if (outputConstraints != null && outputConstraints.size() > 0) {
+            for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();)
+            {
+                inputConstraints.add(it.next());
+            }
+        }
+        
+        boolean emptyInputConstraints = inputConstraints.size() == 0;
+        if (emptyInputConstraints) {
+            DataBag inputData = derivedData.get(dt.getInput());
+            for (Iterator<Tuple> it = inputData.iterator(); it.hasNext();)
+            {
+                inputConstraints.add(it.next());
+            }
+        }
+        Set<Tuple> distinctSet = new HashSet<Tuple>();
+        Iterator<Tuple> it;
+        for (it = inputConstraints.iterator(); it.hasNext();) {
+            if (!distinctSet.add(it.next()))
+                break;
+        }
+        if (!it.hasNext())
+        {
+            // no duplicates found: generate one
+            if (inputConstraints.size()> 0) {
+                Tuple src = ((ExampleTuple)inputConstraints.iterator().next()).toTuple(),
+                      tgt = TupleFactory.getInstance().newTuple(src.getAll());
+                ExampleTuple inputConstraint = new ExampleTuple(tgt);
+                inputConstraint.synthetic = true;
+                inputConstraints.add(inputConstraint);
+            } else if (emptyInputConstraints)
+                inputConstraints.clear();
+        }
     }
 
     @Override
     protected void visit(LOFilter filter) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
+        
         DataBag outputConstraints = outputConstraintsMap.get(filter);
         outputConstraintsMap.remove(filter);
 
@@ -308,6 +373,8 @@ public class AugmentBaseDataVisitor exte
 
     @Override
     protected void visit(LOForEach forEach) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
         DataBag outputConstraints = outputConstraintsMap.get(forEach);
         outputConstraintsMap.remove(forEach);
         List<LogicalPlan> plans = forEach.getForEachPlans();
@@ -388,8 +455,8 @@ public class AugmentBaseDataVisitor exte
         outputConstraintsMap.remove(load);
         // check if the inputData exists
         if (inputData == null || inputData.size() == 0) {
-            log.error("No input data found!");
-            throw new RuntimeException("No input data found!");
+            log.error("No (valid) input data found!");
+            throw new RuntimeException("No (valid) input data found!");
         }
 
         // first of all, we are required to guarantee that there is at least one
@@ -403,11 +470,13 @@ public class AugmentBaseDataVisitor exte
         // create example tuple to steal values from when we encounter
         // "don't care" fields (i.e. null fields)
         Tuple exampleTuple = inputData.iterator().next();
+        System.out.println(exampleTuple.toString());
 
         // run through output constraints; for each one synthesize a tuple and
         // add it to the base data
         // (while synthesizing individual fields, try to match fields that exist
         // in the real data)
+        boolean newInput = false;
         for (Iterator<Tuple> it = outputConstraints.iterator(); it.hasNext();) {
             Tuple outputConstraint = it.next();
 
@@ -423,10 +492,15 @@ public class AugmentBaseDataVisitor exte
             try {
                 for (int i = 0; i < inputTuple.size(); i++) {
                     Object d = outputConstraint.get(i);
-                    if (d == null)
+                    if (d == null && i < exampleTuple.size())
                         d = exampleTuple.get(i);
                     inputTuple.set(i, d);
                 }
+                if (outputConstraint instanceof ExampleTuple)
+                    inputTuple.synthetic = ((ExampleTuple) outputConstraint).synthetic;
+                else
+                    // raw tuple should have been synthesized
+                    inputTuple.synthetic = true;
             } catch (ExecException e) {
                 log
                         .error("Error visiting Load during Augmentation phase of Example Generator! "
@@ -436,15 +510,55 @@ public class AugmentBaseDataVisitor exte
                                 + e.getMessage());
 
             }
-            if (!inputTuple.equals(exampleTuple))
-                inputTuple.synthetic = true;
-
-            newInputData.add(inputTuple);
+            try {
+                if (inputTuple.synthetic || !inInput(inputTuple, inputData, schema))
+                {
+                    inputTuple.synthetic = true;
+
+                    newInputData.add(inputTuple);
+                    
+                    if (!newInput)
+                        newInput = true;
+                }
+            } catch (ExecException e) {
+                throw new VisitorException(
+                  "Error visiting Load during Augmentation phase of Example Generator! "
+                          + e.getMessage());
+            }
+        }
+        
+        if (newInput) {
+            for (Map.Entry<LOLoad, DataBag> entry : newBaseData.entrySet()) {
+                LOLoad otherLoad = entry.getKey();
+                if (otherLoad != load && otherLoad.getInputFile().equals(load.getInputFile())) {
+                    // different load sharing the same input file
+                    entry.getValue().addAll(newInputData);
+                }
+            }
         }
     }
 
+    private boolean inInput(Tuple newTuple, DataBag input, Schema schema) throws ExecException {
+        boolean result;
+        for (Iterator<Tuple> iter = input.iterator(); iter.hasNext();) {
+            result = true;
+            Tuple tmp = iter.next();
+            for (int i = 0; i < schema.size(); ++i)
+                if (!newTuple.get(i).equals(tmp.get(i)))
+                {
+                    result = false;
+                    break;
+                }
+            if (result)
+                return true;
+        }
+        return false;
+    }
+    
     @Override
     protected void visit(LOSort s) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
         DataBag outputConstraints = outputConstraintsMap.get(s);
         outputConstraintsMap.remove(s);
 
@@ -457,11 +571,14 @@ public class AugmentBaseDataVisitor exte
 
     @Override
     protected void visit(LOSplit split) throws VisitorException {
-
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+          return;
     }
 
     @Override
     protected void visit(LOStore store) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
         DataBag outputConstraints = outputConstraintsMap.get(store);
         if (outputConstraints == null) {
             outputConstraintsMap.put(store.getPlan().getPredecessors(store)
@@ -475,6 +592,8 @@ public class AugmentBaseDataVisitor exte
 
     @Override
     protected void visit(LOUnion u) throws VisitorException {
+        if (limit && !((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).getBranchFlag())
+            return;
         DataBag outputConstraints = outputConstraintsMap.get(u);
         outputConstraintsMap.remove(u);
         if (outputConstraints == null || outputConstraints.size() == 0) {
@@ -506,6 +625,61 @@ public class AugmentBaseDataVisitor exte
 
     }
 
+    @Override
+    protected void visit(LOLimit lm) throws VisitorException {
+        if (!limit) // not augment for LIMIT in this traversal
+            return;
+        
+        if (oriLimitMap == null)
+            oriLimitMap = new HashMap<LOLimit, Long>();
+        
+        DataBag outputConstraints = outputConstraintsMap.get(lm);
+        outputConstraintsMap.remove(lm);
+
+        DataBag inputConstraints = outputConstraintsMap.get(lm.getInput());
+        if (inputConstraints == null) {
+            inputConstraints = BagFactory.getInstance().newDefaultBag();
+            outputConstraintsMap.put(lm.getInput(), inputConstraints);
+        }
+
+        DataBag inputData = derivedData.get(lm.getInput());
+        
+        if (outputConstraints != null && outputConstraints.size() > 0) { // there
+            // 's
+            // one
+            // or
+            // more
+            // output
+            // constraints
+            // ;
+            // generate
+            // corresponding
+            // input
+            // constraints
+            for (Iterator<Tuple> it = outputConstraints.iterator(); it
+                  .hasNext();) {
+                inputConstraints.add(it.next());
+             // ... plus one more if only one
+             if (inputConstraints.size() == 1) {
+                inputConstraints.add(inputData.iterator().next());
+                ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+             }
+          }
+        } else if (inputConstraints.size() == 0){
+            // add all input to input constraints ...
+            inputConstraints.addAll(inputData);
+            // ... plus one more if only one
+            if (inputConstraints.size() == 1) {
+                inputConstraints.add(inputData.iterator().next());
+                ((PreOrderDepthFirstWalker<LogicalOperator, LogicalPlan>) mCurrentWalker).setBranchFlag();
+            }
+        }
+        POLimit poLimit = (POLimit) logToPhysMap.get(lm);
+        oriLimitMap.put(lm, Long.valueOf(poLimit.getLimit()));
+        poLimit.setLimit(inputConstraints.size()-1);
+        lm.setLimit(poLimit.getLimit());
+    }
+    
     Tuple GetGroupByInput(Object groupLabel, List<Integer> groupCols,
             int numFields) throws ExecException {
         Tuple t = TupleFactory.getInstance().newTuple(numFields);

Modified: pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/pen/DerivedDataVisitor.java Mon Dec 13 19:11:00 2010
@@ -67,7 +67,7 @@ import org.apache.pig.pen.util.LineageTr
 
 
 //This class is used to pass data through the entire plan and save the intermediates results.
-public class DerivedDataVisitor extends LOVisitor {
+public class DerivedDataVisitor {
 
     Map<LogicalOperator, DataBag> derivedData = new HashMap<LogicalOperator, DataBag>();
     PhysicalPlan physPlan = null;
@@ -83,16 +83,13 @@ public class DerivedDataVisitor extends 
 
     public DerivedDataVisitor(LogicalPlan plan, PigContext pc,
             Map<LOLoad, DataBag> baseData,
-            Map<LogicalOperator, PhysicalOperator> logToPhyMap,
             PhysicalPlan physPlan) {
-        super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(
-                plan));
+
         this.baseData = baseData;
 
         OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
         EqClasses = new LinkedList<IdentityHashSet<Tuple>>();
 
-        LogToPhyMap = logToPhyMap;
         this.physPlan = physPlan;
         // if(logToPhyMap == null)
         // compilePlan(plan);
@@ -105,9 +102,6 @@ public class DerivedDataVisitor extends 
             Map<LOLoad, DataBag> baseData,
             Map<LogicalOperator, PhysicalOperator> logToPhyMap,
             PhysicalPlan physPlan) {
-        super(op.getPlan(),
-                new DependencyOrderLimitedWalker<LogicalOperator, LogicalPlan>(
-                        op, op.getPlan()));
         this.baseData = baseData;
 
         OpToEqClasses = new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
@@ -115,364 +109,5 @@ public class DerivedDataVisitor extends 
 
         LogToPhyMap = logToPhyMap;
         this.physPlan = physPlan;
-        // if(logToPhyMap == null)
-        // compilePlan(op.getPlan());
-        // else
-        // LogToPhyMap = logToPhyMap;
-    }
-
-    public void setOperatorToEvaluate(LogicalOperator op) {
-        mCurrentWalker = new DependencyOrderLimitedWalker<LogicalOperator, LogicalPlan>(
-                op, op.getPlan());
-    }
-
-    @Override
-    protected void visit(LOCogroup cg) throws VisitorException {
-        // evaluateOperator(cg);
-        // there is a slightly different code path for cogroup because of the
-        // local rearranges
-        PhysicalOperator physOp = LogToPhyMap.get(cg);
-        Random r = new Random();
-        // get the list of original inputs
-
-        // List<PhysicalOperator> inputs = physOp.getInputs();
-        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
-        PhysicalPlan phy = new PhysicalPlan();
-        phy.add(physOp);
-
-        // for(PhysicalOperator input : physOp.getInputs()) {
-        for (PhysicalOperator input : physPlan.getPredecessors(physOp)) {
-            inputs.add(input.getInputs().get(0));
-            // input.setInputs(null);
-            phy.add(input);
-            try {
-                phy.connect(input, physOp);
-            } catch (PlanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                log.error("Error connecting " + input.name() + " to "
-                        + physOp.name());
-            }
-        }
-
-        physOp.setLineageTracer(lineage);
-
-        // replace the original inputs by POReads
-        for (int i = 0; i < inputs.size(); i++) {
-            DataBag bag = derivedData.get(cg.getInputs().get(i));
-            PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
-            phy.add(por);
-            try {
-                phy.connect(por, physOp.getInputs().get(i));
-            } catch (PlanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                log.error("Error connecting " + por.name() + " to "
-                        + physOp.name());
-            }
-        }
-
-        DataBag output = BagFactory.getInstance().newDefaultBag();
-        Tuple t = null;
-        try {
-            for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
-                    .getNext(t)) {
-                output.add((Tuple) res.result);
-            }
-        } catch (ExecException e) {
-            log.error("Error evaluating operator : " + physOp.name());
-        }
-        derivedData.put(cg, output);
-
-        try {
-            Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
-                    .getEquivalenceClasses(cg, derivedData);
-            EqClasses.addAll(eq);
-            OpToEqClasses.put(cg, eq);
-        } catch (ExecException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            log
-                    .error("Error updating equivalence classes while evaluating operators. \n"
-                            + e.getMessage());
-        }
-
-        // re-attach the original operators
-        // for(int i = 0; i < inputs.size(); i++) {
-        // try {
-        // physPlan.connect(inputs.get(i), physOp.getInputs().get(i));
-        //		
-        // } catch (PlanException e) {
-        // // TODO Auto-generated catch block
-        // e.printStackTrace();
-        // log.error("Error connecting " + inputs.get(i).name() + " to " +
-        // physOp.getInputs().get(i).name());
-        // }
-        // }
-        physOp.setLineageTracer(null);
-    }
-
-    @Override
-    protected void visit(LOCross cs) throws VisitorException {
-        evaluateOperator(cs);
-    }
-
-    @Override
-    protected void visit(LODistinct dt) throws VisitorException {
-        evaluateOperator(dt);
-    }
-
-    @Override
-    protected void visit(LOFilter filter) throws VisitorException {
-        evaluateOperator(filter);
-    }
-
-    @Override
-    protected void visit(LOForEach forEach) throws VisitorException {
-        evaluateOperator(forEach);
-    }
-
-    @Override
-    protected void visit(LOLoad load) throws VisitorException {
-        derivedData.put(load, baseData.get(load));
-
-        Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
-                .getEquivalenceClasses(load, derivedData);
-        EqClasses.addAll(eq);
-        OpToEqClasses.put(load, eq);
-
-        for (Iterator<Tuple> it = derivedData.get(load).iterator(); it
-                .hasNext();) {
-            lineage.insert(it.next());
-        }
-
     }
-
-    @Override
-    protected void visit(LOSplit split) throws VisitorException {
-        evaluateOperator(split);
-    }
-
-    @Override
-    protected void visit(LOStore store) throws VisitorException {
-        derivedData.put(store, derivedData.get(store.getPlan().getPredecessors(
-                store).get(0)));
-    }
-
-    @Override
-    protected void visit(LOUnion u) throws VisitorException {
-        evaluateOperator(u);
-    }
-
-    @Override
-    protected void visit(LOLimit l) throws VisitorException {
-        evaluateOperator(l);
-    }
-    
-    @Override
-    protected void visit(LOSort sort) throws VisitorException {
-        evaluateOperator(sort);
-    }
-
-    // private void compilePlan(LogicalPlan plan) {
-    //	
-    // plan = refineLogicalPlan(plan);
-    //	
-    // LocalLogToPhyTranslationVisitor visitor = new
-    // LocalLogToPhyTranslationVisitor(plan);
-    // visitor.setPigContext(pc);
-    // try {
-    // visitor.visit();
-    // } catch (VisitorException e) {
-    // // TODO Auto-generated catch block
-    // e.printStackTrace();
-    // log.error("Error visiting the logical plan in ExampleGenerator");
-    // }
-    // physPlan = visitor.getPhysicalPlan();
-    // LogToPhyMap = visitor.getLogToPhyMap();
-    // }
-    //    
-    // private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
-    // PlanSetter ps = new PlanSetter(plan);
-    // try {
-    // ps.visit();
-    //	    
-    // } catch (VisitorException e) {
-    // // TODO Auto-generated catch block
-    // e.printStackTrace();
-    // }
-    //        
-    // // run through validator
-    // CompilationMessageCollector collector = new CompilationMessageCollector()
-    // ;
-    // FrontendException caught = null;
-    // try {
-    // LogicalPlanValidationExecutor validator =
-    // new LogicalPlanValidationExecutor(plan, pc);
-    // validator.validate(plan, collector);
-    // } catch (FrontendException fe) {
-    // // Need to go through and see what the collector has in it. But
-    // // remember what we've caught so we can wrap it into what we
-    // // throw.
-    // caught = fe;
-    // }
-    //        
-    //        
-    // return plan;
-    //
-    // }
-
-    private void evaluateOperator(LogicalOperator op) {
-        PhysicalOperator physOp = LogToPhyMap.get(op);
-        Random r = new Random();
-        // get the list of original inputs
-
-        List<PhysicalOperator> inputs = physOp.getInputs();
-        physOp.setInputs(null);
-        physOp.setLineageTracer(lineage);
-        PhysicalPlan phy = new PhysicalPlan();
-        phy.add(physOp);
-
-        // replace the original inputs by POReads
-        for (LogicalOperator l : op.getPlan().getPredecessors(op)) {
-            DataBag bag = derivedData.get(l);
-            PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
-            phy.add(por);
-            try {
-                phy.connect(por, physOp);
-            } catch (PlanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                log.error("Error connecting " + por.name() + " to "
-                        + physOp.name());
-            }
-        }
-
-        DataBag output = BagFactory.getInstance().newDefaultBag();
-        Tuple t = null;
-        try {
-            for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
-                    .getNext(t)) {
-                output.add((Tuple) res.result);
-            }
-        } catch (ExecException e) {
-            log.error("Error evaluating operator : " + physOp.name());
-        }
-        derivedData.put(op, output);
-
-        try {
-            Collection<IdentityHashSet<Tuple>> eq = EquivalenceClasses
-                    .getEquivalenceClasses(op, derivedData);
-            EqClasses.addAll(eq);
-            OpToEqClasses.put(op, eq);
-        } catch (ExecException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            log
-                    .error("Error updating equivalence classes while evaluating operators. \n"
-                            + e.getMessage());
-        }
-
-        // re-attach the original operators
-        physOp.setInputs(inputs);
-        physOp.setLineageTracer(null);
-    }
-
-    public DataBag evaluateIsolatedOperator(LOCogroup op,
-            List<DataBag> inputBags) {
-        if (op.getPlan().getPredecessors(op).size() > inputBags.size())
-            return null;
-
-        int count = 0;
-        for (LogicalOperator inputs : op.getPlan().getPredecessors(op)) {
-            derivedData.put(inputs, inputBags.get(count++));
-        }
-
-        return evaluateIsolatedOperator(op);
-
-    }
-
-    public DataBag evaluateIsolatedOperator(LOCogroup op) {
-        // return null if the inputs are not already evaluated
-        for (LogicalOperator in : op.getPlan().getPredecessors(op)) {
-            if (derivedData.get(in) == null)
-                return null;
-        }
-
-        LineageTracer oldLineage = this.lineage;
-        this.lineage = new LineageTracer();
-
-        PhysicalOperator physOp = LogToPhyMap.get(op);
-        Random r = new Random();
-        // get the list of original inputs
-        // List<PhysicalOperator> inputs = physOp.getInputs();
-        List<PhysicalOperator> inputs = new ArrayList<PhysicalOperator>();
-        PhysicalPlan phy = new PhysicalPlan();
-        phy.add(physOp);
-
-        for (PhysicalOperator input : physOp.getInputs()) {
-            inputs.add(input.getInputs().get(0));
-            input.setInputs(null);
-            phy.add(input);
-            try {
-                phy.connect(input, physOp);
-            } catch (PlanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                log.error("Error connecting " + input.name() + " to "
-                        + physOp.name());
-            }
-        }
-        physOp.setLineageTracer(lineage);
-
-        physOp.setLineageTracer(null);
-
-        // replace the original inputs by POReads
-        for (int i = 0; i < inputs.size(); i++) {
-            DataBag bag = derivedData.get(op.getInputs().get(i));
-            PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
-            phy.add(por);
-            try {
-                phy.connect(por, physOp.getInputs().get(i));
-            } catch (PlanException e) {
-                // TODO Auto-generated catch block
-                e.printStackTrace();
-                log.error("Error connecting " + por.name() + " to "
-                        + physOp.name());
-            }
-        }
-
-        // replace the original inputs by POReads
-        // for(LogicalOperator l : op.getPlan().getPredecessors(op)) {
-        // DataBag bag = derivedData.get(l);
-        // PORead por = new PORead(new OperatorKey("", r.nextLong()), bag);
-        // phy.add(por);
-        // try {
-        // phy.connect(por, physOp);
-        // } catch (PlanException e) {
-        // // TODO Auto-generated catch block
-        // e.printStackTrace();
-        // log.error("Error connecting " + por.name() + " to " + physOp.name());
-        // }
-        // }
-
-        DataBag output = BagFactory.getInstance().newDefaultBag();
-        Tuple t = null;
-        try {
-            for (Result res = physOp.getNext(t); res.returnStatus != POStatus.STATUS_EOP; res = physOp
-                    .getNext(t)) {
-                output.add((Tuple) res.result);
-            }
-        } catch (ExecException e) {
-            log.error("Error evaluating operator : " + physOp.name());
-        }
-
-        this.lineage = oldLineage;
-
-        physOp.setInputs(inputs);
-        physOp.setLineageTracer(null);
-
-        return output;
-    }
-
 }

Modified: pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java (original)
+++ pig/trunk/src/org/apache/pig/pen/EquivalenceClasses.java Mon Dec 13 19:11:00 2010
@@ -19,174 +19,105 @@
 package org.apache.pig.pen;
 
 import java.util.Collection;
-import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.HashMap;
+import java.util.List;
+import java.util.HashSet;
+import java.util.Iterator;
 
-import org.apache.pig.backend.executionengine.ExecException;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
-import org.apache.pig.impl.logicalLayer.LOCogroup;
-import org.apache.pig.impl.logicalLayer.LOFilter;
 import org.apache.pig.impl.logicalLayer.LOForEach;
-import org.apache.pig.impl.logicalLayer.LOLoad;
-import org.apache.pig.impl.logicalLayer.LOSort;
-import org.apache.pig.impl.logicalLayer.LOSplit;
-import org.apache.pig.impl.logicalLayer.LOUnion;
+import org.apache.pig.impl.logicalLayer.LOCross;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
+import org.apache.pig.impl.logicalLayer.LogicalPlan;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.*;
+import org.apache.pig.impl.plan.VisitorException;
 
 
 //These methods are used to generate equivalence classes given the operator name and the output from the operator
 //For example, it gives out 2 eq. classes for filter, one that passes the filter and one that doesn't
 public class EquivalenceClasses {
-    public static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
-            LogicalOperator op, Map<LogicalOperator, DataBag> derivedData)
-            throws ExecException {
-        if (op instanceof LOCogroup)
-            return getEquivalenceClasses((LOCogroup) op, derivedData);
-        else if (op instanceof LOForEach)
-            return getEquivalenceClasses((LOForEach) op, derivedData);
-        else if (op instanceof LOFilter)
-            return getEquivalenceClasses((LOFilter) op, derivedData);
-        else if (op instanceof LOSort)
-            return getEquivalenceClasses((LOSort) op, derivedData);
-        else if (op instanceof LOSplit)
-            return getEquivalenceClasses((LOSplit) op, derivedData);
-        else if (op instanceof LOUnion)
-            return getEquivalenceClasses((LOUnion) op, derivedData);
-        else if (op instanceof LOLoad)
-            return getEquivalenceClasses((LOLoad) op, derivedData);
-            throw new RuntimeException("Unrecognized logical operator.");
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOLoad op,
-            Map<LogicalOperator, DataBag> derivedData) {
-        // Since its a load, all the tuples belong to a single equivalence class
-        Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-        IdentityHashSet<Tuple> input = new IdentityHashSet<Tuple>();
-
-        equivClasses.add(input);
-
-        DataBag output = derivedData.get(op);
-
-        for (Iterator<Tuple> it = output.iterator(); it.hasNext();) {
-            Tuple t = it.next();
-
-            input.add(t);
-        }
-
-        return equivClasses;
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
-            LOCogroup op, Map<LogicalOperator, DataBag> derivedData)
-            throws ExecException {
-        Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-        IdentityHashSet<Tuple> acceptableGroups = new IdentityHashSet<Tuple>();
-
-        equivClasses.add(acceptableGroups);
-
-        for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
-            Tuple t = it.next();
-
-            boolean isAcceptable;
-
-            if (t.size() == 2) {
-                isAcceptable = (((DataBag) (t.get(1))).size() >= 2);
+    
+    public static Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap(PhysicalPlan plan,
+        LogicalPlan lp, Map<LogicalOperator, PhysicalOperator> logToPhyMap,
+        Map<LogicalOperator, DataBag> logToDataMap,
+        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+        final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap)
+        throws VisitorException {
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+          new HashMap<LogicalOperator, Collection<IdentityHashSet<Tuple>>>();
+        List<LogicalOperator> roots = lp.getRoots();
+        HashSet<LogicalOperator> seen = new HashSet<LogicalOperator>();
+        for(LogicalOperator lo: roots) {
+            getEqClasses(plan, lo, lp, logToPhyMap, ret, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
+        }
+        return ret;
+    }
+    
+    private static void getEqClasses(PhysicalPlan plan, LogicalOperator parent, LogicalPlan lp,
+        Map<LogicalOperator, PhysicalOperator> logToPhyMap, Map<LogicalOperator,
+        Collection<IdentityHashSet<Tuple>>> result,
+        final HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap,
+        Map<LogicalOperator, DataBag> logToDataMap,
+        Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap,
+        HashSet<LogicalOperator> seen) throws VisitorException {
+        if (parent instanceof LOForEach) {
+            if (poToEqclassesMap.get(logToPhyMap.get(parent)) != null) {
+                LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                eqClasses.addAll(poToEqclassesMap.get(logToPhyMap.get(parent)));
+                for (Map.Entry<LogicalOperator, PhysicalOperator> entry : forEachInnerLogToPhyMap.get(parent).entrySet()) {
+                    if (poToEqclassesMap.get(entry.getValue()) != null)
+                        eqClasses.addAll(poToEqclassesMap.get(entry.getValue()));
+                }
+                result.put(parent, eqClasses);
+            }
+        } else if (parent instanceof LOCross) {
+            boolean ok = true; 
+            for (LogicalOperator input : ((LOCross) parent).getInputs()) {
+                if (logToDataMap.get(input).size() < 2) {
+                    // only if all inputs have at least more than two tuples will all outputs be added to the eq. class
+                    ok = false;
+                    break;
+                }
+            }
+            if (ok) {
+                LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+                for (Iterator<Tuple> it = logToDataMap.get(parent).iterator(); it.hasNext();) {
+                    eqClass.add(it.next());
+                }
+                eqClasses.add(eqClass);
+                result.put(parent, eqClasses);
             } else {
-                isAcceptable = true;
-                for (int field = 1; field < t.size(); field++) {
-                    DataBag bag = (DataBag) t.get(field);
-                    if (bag.size() == 0) {
-                        isAcceptable = false;
-                        break;
-                    }
+                LinkedList<IdentityHashSet<Tuple>> eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                IdentityHashSet<Tuple> eqClass = new IdentityHashSet<Tuple>();
+                eqClasses.add(eqClass);
+                result.put(parent, eqClasses);
+            }
+        } else {
+            Collection<IdentityHashSet<Tuple>> eqClasses = poToEqclassesMap.get(logToPhyMap.get(parent));
+            if (eqClasses == null) {
+                eqClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                int size = ((POPackage)logToPhyMap.get(parent)).getNumInps();
+                for (int i = 0; i < size; i++) {
+                    eqClasses.add(new IdentityHashSet<Tuple>());
                 }
             }
-
-            if (isAcceptable)
-                acceptableGroups.add(t);
-
+            result.put(parent, eqClasses);
         }
-        return equivClasses;
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
-            LOForEach op, Map<LogicalOperator, DataBag> derivedData) {
-        Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
-        IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
-        equivClasses.add(equivClass);
-
-        for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
-            equivClass.add(it.next());
-        }
-
-        return equivClasses;
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(
-            LOFilter op, Map<LogicalOperator, DataBag> derivedData) {
-        Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
-        IdentityHashSet<Tuple> pass = new IdentityHashSet<Tuple>();
-        IdentityHashSet<Tuple> fail = new IdentityHashSet<Tuple>();
-
-        for (Iterator<Tuple> it = derivedData.get(op).iterator(); it.hasNext();) {
-            pass.add(it.next());
-        }
-
-        LogicalOperator input = op.getInput();
-
-        for (Iterator<Tuple> it = derivedData.get(input).iterator(); it
-                .hasNext();) {
-            Tuple t = it.next();
-            if (pass.contains(t))
-                continue;
-            else
-                fail.add(t);
-        }
-
-        equivClasses.add(pass);
-        equivClasses.add(fail);
-
-        return equivClasses;
-
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOSort op,
-            Map<LogicalOperator, DataBag> derivedData) {
-        //We don't create any eq. class for sort
-        IdentityHashSet<Tuple> temp = new IdentityHashSet<Tuple>();
-        Collection<IdentityHashSet<Tuple>> output = new LinkedList<IdentityHashSet<Tuple>>();
-        output.add(temp);
-        return output;
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOSplit op,
-            Map<LogicalOperator, DataBag> derivedData) {
-        throw new RuntimeException(
-                "LOSplit not supported yet in example generator.");
-    }
-
-    static Collection<IdentityHashSet<Tuple>> getEquivalenceClasses(LOUnion op,
-            Map<LogicalOperator, DataBag> derivedData) {
-
-        // make one equivalence class per input relation
-
-        Collection<IdentityHashSet<Tuple>> equivClasses = new LinkedList<IdentityHashSet<Tuple>>();
-
-        for (LogicalOperator input : op.getInputs()) {
-            IdentityHashSet<Tuple> equivClass = new IdentityHashSet<Tuple>();
-
-            for (Iterator<Tuple> it = derivedData.get(input).iterator(); it
-                    .hasNext();) {
-                equivClass.add(it.next());
+        // result.put(parent, getEquivalenceClasses(plan, parent, lp, logToPhyMap, poToEqclassesMap));
+        if (lp.getSuccessors(parent) != null) {
+            for (LogicalOperator lo : lp.getSuccessors(parent)) {
+                if (!seen.contains(lo)) {
+                    seen.add(lo);
+                    getEqClasses(plan, lo, lp, logToPhyMap, result, poToEqclassesMap, logToDataMap, forEachInnerLogToPhyMap, seen);
+                }
             }
-            equivClasses.add(equivClass);
         }
-
-        return equivClasses;
     }
 }

Modified: pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java (original)
+++ pig/trunk/src/org/apache/pig/pen/ExampleGenerator.java Mon Dec 13 19:11:00 2010
@@ -19,54 +19,77 @@
 package org.apache.pig.pen;
 
 import java.util.HashMap;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.Collection;
+import java.util.Iterator;
+import java.io.IOException;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.pig.ExecType;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POLoad;
-import org.apache.pig.data.BagFactory;
+import org.apache.pig.backend.hadoop.executionengine.HExecutionEngine;
+
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.PigContext;
+import org.apache.pig.PigException;
+import org.apache.pig.impl.io.FileLocalizer;
 import org.apache.pig.impl.logicalLayer.FrontendException;
+import org.apache.pig.impl.logicalLayer.LOForEach;
 import org.apache.pig.impl.logicalLayer.LOLoad;
 import org.apache.pig.impl.logicalLayer.LogicalOperator;
 import org.apache.pig.impl.logicalLayer.LogicalPlan;
-import org.apache.pig.impl.logicalLayer.PlanSetter;
-import org.apache.pig.impl.logicalLayer.optimizer.LogicalOptimizer;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
-import org.apache.pig.impl.logicalLayer.validators.LogicalPlanValidationExecutor;
-import org.apache.pig.impl.plan.CompilationMessageCollector;
+import org.apache.pig.impl.logicalLayer.LOSort;
+import org.apache.pig.impl.logicalLayer.LOLimit;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.DisplayExamples;
-import org.apache.pig.pen.util.FunctionalLogicalOptimizer;
 import org.apache.pig.pen.util.LineageTracer;
 
+/**
+ *   This class is used to generate example tuples for the ILLUSTRATE purpose 
+ * 
+ *
+ */
 public class ExampleGenerator {
 
     LogicalPlan plan;
-    Map<LOLoad, DataBag> baseData;
+    Map<LOLoad, DataBag> baseData = null;
     PigContext pigContext;
 
-    Map<LogicalOperator, PhysicalOperator> LogToPhyMap;
     PhysicalPlan physPlan;
+    PhysicalPlanResetter physPlanReseter;
+    private HExecutionEngine execEngine;
+    private LocalMapReduceSimulator localMRRunner;
 
     Log log = LogFactory.getLog(getClass());
 
     private int MAX_RECORDS = 10000;
+    
+    private Map<LogicalOperator, PhysicalOperator> logToPhyMap;
+    private Map<PhysicalOperator, LogicalOperator> poLoadToLogMap;
+    private Map<PhysicalOperator, LogicalOperator> poToLogMap;
+    private HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> poToEqclassesMap;
+    private LineageTracer lineage;
+    private Map<LogicalOperator, DataBag> logToDataMap = null;
+    private Map<LOForEach, Map<LogicalOperator, DataBag>> forEachInnerLogToDataMap;
+    Map<LOForEach, Map<LogicalOperator, PhysicalOperator>> forEachInnerLogToPhyMap;
+    Map<LOLimit, Long> oriLimitMap = null;
+    Map<POLoad, Schema> poLoadToSchemaMap;
 
     public ExampleGenerator(LogicalPlan plan, PigContext hadoopPigContext) {
         this.plan = plan;
 //        pigContext = new PigContext(ExecType.LOCAL, hadoopPigContext
 //                .getProperties());
         pigContext = hadoopPigContext;
+        // pigContext.setExecType(ExecType.LOCAL);
+        FileLocalizer.setInitialized(false);
         try {
             pigContext.connect();
         } catch (ExecException e) {
@@ -74,167 +97,283 @@ public class ExampleGenerator {
                     + e.getLocalizedMessage());
 
         }
-
+        execEngine = new HExecutionEngine(pigContext);
+        localMRRunner = new LocalMapReduceSimulator();
+        poLoadToSchemaMap = new HashMap<POLoad, Schema>();
     }
 
+    public LineageTracer getLineage() {
+      return lineage;
+    }
+    
+    public Map<LogicalOperator, PhysicalOperator> getLogToPhyMap() {
+        return logToPhyMap;
+    }
+    
     public void setMaxRecords(int max) {
         MAX_RECORDS = max;
     }
 
-    public Map<LogicalOperator, DataBag> getExamples() {
-
-        compilePlan(plan);
-
+    public Map<LogicalOperator, DataBag> getExamples() throws IOException, InterruptedException {
+        if (pigContext.getProperties().getProperty("pig.usenewlogicalplan", "true").equals("false"))
+            throw new ExecException("ILLUSTRATE must use the new logical plan!");
+        pigContext.inIllustrator = true;
+        physPlan = compilePlan(plan);
+        physPlanReseter = new PhysicalPlanResetter(physPlan);
         List<LogicalOperator> loads = plan.getRoots();
+        List<PhysicalOperator> pRoots = physPlan.getRoots();
+        if (loads.size() != pRoots.size())
+            throw new ExecException("Logical and Physical plans have different number of roots");
+        logToPhyMap = execEngine.getLogToPhyMap();
+        forEachInnerLogToPhyMap = execEngine.getForEachInnerLogToPhyMap();
+        poLoadToLogMap = new HashMap<PhysicalOperator, LogicalOperator>();
+        logToDataMap = new HashMap<LogicalOperator, DataBag>();
+        poToLogMap = new HashMap<PhysicalOperator, LogicalOperator>();
+        
+        // set up foreach inner data map
+        forEachInnerLogToDataMap = new HashMap<LOForEach, Map<LogicalOperator, DataBag>>();
+        for (Map.Entry<LOForEach, Map<LogicalOperator, PhysicalOperator>> entry : forEachInnerLogToPhyMap.entrySet()) {
+            Map<LogicalOperator, DataBag> innerMap = new HashMap<LogicalOperator, DataBag>();
+            forEachInnerLogToDataMap.put(entry.getKey(), innerMap);
+        }
 
+        for (LogicalOperator load : loads)
+        {
+            poLoadToLogMap.put(logToPhyMap.get(load), load);
+        }
+
+        boolean hasLimit = false;
+        for (LogicalOperator lo : logToPhyMap.keySet()) {
+            poToLogMap.put(logToPhyMap.get(lo), lo);
+            if (!hasLimit && lo instanceof LOLimit)
+                hasLimit = true;
+        }
+        
         try {
             readBaseData(loads);
         } catch (ExecException e) {
-            // TODO Auto-generated catch block
             log.error("Error reading data. " + e.getMessage());
-            throw new RuntimeException(e.getMessage());
+            throw e;
         } catch (FrontendException e) {
-            // TODO Auto-generated catch block
             log.error("Error reading data. " + e.getMessage());
+            throw new RuntimeException(e.getMessage());
         }
 
-        DerivedDataVisitor derivedData = null;
+        Map<LogicalOperator, DataBag> derivedData = null;
         try {
 
             // create derived data and trim base data
             LineageTrimmingVisitor trimmer = new LineageTrimmingVisitor(plan,
-                    baseData, LogToPhyMap, physPlan, pigContext);
+                    baseData, this, logToPhyMap, physPlan, pigContext);
             trimmer.visit();
             // System.out.println(
             // "Obtained the first level derived and trimmed data");
             // create new derived data from trimmed basedata
-            derivedData = new DerivedDataVisitor(plan, null, baseData,
-                    trimmer.LogToPhyMap, physPlan);
-            derivedData.visit();
+            derivedData = getData(physPlan);
 
             // System.out.println(
             // "Got new derived data from the trimmed base data");
             // augment base data
             AugmentBaseDataVisitor augment = new AugmentBaseDataVisitor(plan,
-                    baseData, derivedData.derivedData);
+                    logToPhyMap, baseData, derivedData);
             augment.visit();
             this.baseData = augment.getNewBaseData();
             // System.out.println("Obtained augmented base data");
             // create new derived data and trim the base data after augmenting
             // base data with synthetic tuples
-            trimmer = new LineageTrimmingVisitor(plan, baseData,
-                    derivedData.LogToPhyMap, physPlan, pigContext);
+            trimmer = new LineageTrimmingVisitor(plan, baseData, this,
+                    logToPhyMap, physPlan, pigContext);
             trimmer.visit();
             // System.out.println("Final trimming");
             // create the final version of derivedData to give to the output
-            derivedData = new DerivedDataVisitor(plan, null, baseData,
-                    trimmer.LogToPhyMap, physPlan);
-            derivedData.visit();
+            derivedData = getData(physPlan);
             // System.out.println("Obtaining final derived data for output");
+            
+            if (hasLimit)
+            {
+                augment.setLimit();
+                augment.visit();
+                this.baseData = augment.getNewBaseData();
+                oriLimitMap = augment.getOriLimitMap();
+                derivedData = getData();
+            }
 
         } catch (VisitorException e) {
-            // TODO Auto-generated catch block
+            e.printStackTrace(System.out);
             log.error("Visitor exception while creating example data "
                     + e.getMessage());
+            throw new RuntimeException(e.getMessage());
         }
 
         // DisplayExamples.printSimple(plan.getLeaves().get(0),
         // derivedData.derivedData);
         System.out.println(DisplayExamples.printTabular(plan,
-                derivedData.derivedData));
-        return derivedData.derivedData;
+                derivedData, forEachInnerLogToDataMap));
+        pigContext.inIllustrator = false;
+        return derivedData;
     }
 
-    private void readBaseData(List<LogicalOperator> loads) throws ExecException, FrontendException {
-        baseData = new HashMap<LOLoad, DataBag>();
+    private void readBaseData(List<LogicalOperator> loads) throws IOException, InterruptedException, FrontendException, ExecException {
+        PhysicalPlan thisPhyPlan = new PhysicalPlan();
         for (LogicalOperator op : loads) {
             Schema schema = op.getSchema();
             if(schema == null) {
                 throw new ExecException("Example Generator requires a schema. Please provide a schema while loading data.");
             }
-            
-            DataBag opBaseData = BagFactory.getInstance().newDefaultBag();
-
-            POLoad poLoad = (POLoad) LogToPhyMap.get(op);
-//            PigContext oldPC = poLoad.getPc();
-//            poLoad.setPc(pigContext);
-
-            poLoad.setLineageTracer(new LineageTracer());
-
-            Tuple t = null;
-            int count = 0;
-            for (Result res = poLoad.getNext(t); res.returnStatus != POStatus.STATUS_EOP
-                    && count < MAX_RECORDS; res = poLoad.getNext(t)) {
-                if (res.returnStatus == POStatus.STATUS_NULL)
-                    continue;
-                if (res.returnStatus == POStatus.STATUS_ERR) {
-                    log.error("Error reading Tuple");
-                } else {
-                    opBaseData.add((Tuple) res.result);
-                    count++;
-                }
-
+            poLoadToSchemaMap.put((POLoad)logToPhyMap.get(op), schema);
+            thisPhyPlan.add(logToPhyMap.get(op));
+        }
+        baseData = null;
+        Map<LogicalOperator, DataBag> result = getData(thisPhyPlan);
+        baseData = new HashMap<LOLoad, DataBag>();
+        for (LogicalOperator lo : result.keySet()) {
+            if (lo instanceof LOLoad) {
+                baseData.put((LOLoad) lo, result.get(lo));
             }
-            baseData.put((LOLoad) op, opBaseData);
-            // poLoad.setPc(oldPC);
-            poLoad.setLineageTracer(null);
         }
-
     }
 
-    private void compilePlan(LogicalPlan plan) {
-
-        plan = refineLogicalPlan(plan);
-
-        LocalLogToPhyTranslationVisitor visitor = new LocalLogToPhyTranslationVisitor(
-                plan);
-        visitor.setPigContext(pigContext);
-        try {
-            visitor.visit();
-        } catch (VisitorException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
-            log.error("Error visiting the logical plan in ExampleGenerator");
+    PhysicalPlan compilePlan(LogicalPlan plan) throws ExecException, FrontendException {
+        return execEngine.compile(plan, null);
+    }
+    
+    public Map<LogicalOperator, DataBag> getData() throws IOException, InterruptedException {
+      return getData(physPlan);
+    }
+    
+    private Map<LogicalOperator, DataBag> getData(PhysicalPlan plan) throws PigException, IOException, InterruptedException
+    {
+        // get data on a physical plan possibly trimmed of one branch 
+        lineage = new LineageTracer();
+        IllustratorAttacher attacher = new IllustratorAttacher(plan, lineage, MAX_RECORDS, poLoadToSchemaMap, pigContext);
+        attacher.visit();
+        if (oriLimitMap != null) {
+            for (Map.Entry<LOLimit, Long> entry : oriLimitMap.entrySet()) {
+                logToPhyMap.get(entry.getKey()).getIllustrator().setOriginalLimit(entry.getValue());
+            }
         }
-        physPlan = visitor.getPhysicalPlan();
-        LogToPhyMap = visitor.getLogToPhyMap();
+        getLogToDataMap(attacher.getDataMap());
+        if (baseData != null ) {
+            setLoadDataMap();
+            physPlanReseter.visit();
+        }
+        localMRRunner.launchPig(plan, baseData, poLoadToLogMap, lineage, attacher, this, pigContext);
+        if (baseData == null)
+            poToEqclassesMap = attacher.poToEqclassesMap;
+        else {
+            for (Map.Entry<PhysicalOperator, Collection<IdentityHashSet<Tuple>>> entry : attacher.poToEqclassesMap.entrySet()) {
+                if(!(entry.getKey() instanceof POLoad))
+                  poToEqclassesMap.put(entry.getKey(), entry.getValue());
+            }
+        }
+        if (baseData != null)
+            // only for non derived data generation
+            phyToMRTransform(plan, attacher.getDataMap());
+        return logToDataMap;
     }
-
-    private LogicalPlan refineLogicalPlan(LogicalPlan plan) {
-        PlanSetter ps = new PlanSetter(plan);
-        try {
-            ps.visit();
-
-        } catch (VisitorException e) {
-            // TODO Auto-generated catch block
-            e.printStackTrace();
+    
+    public Map<LogicalOperator, DataBag> getData(Map<LOLoad, DataBag> newBaseData) throws Exception 
+    {
+        baseData = newBaseData;
+        return getData(physPlan);
+    }
+    
+    private void phyToMRTransform(PhysicalPlan plan, Map<PhysicalOperator, DataBag> phyToDataMap) {
+        // remap the LO to PO as result of the MR compilation may have changed PO in the MR plans
+        Map<PhysicalOperator, PhysicalOperator> phyToMRMap = localMRRunner.getPhyToMRMap();
+        for (Map.Entry<PhysicalOperator, LogicalOperator> entry : poToLogMap.entrySet()) {
+            if (phyToMRMap.get(entry.getKey()) != null) {
+                PhysicalOperator poInMR = phyToMRMap.get(entry.getKey());
+                logToDataMap.put(entry.getValue(), phyToDataMap.get(poInMR));
+                poToEqclassesMap.put(entry.getKey(), poToEqclassesMap.get(poInMR));
+            }
         }
-
-        // run through validator
-        CompilationMessageCollector collector = new CompilationMessageCollector();
-        FrontendException caught = null;
-        try {
-            boolean isBeforeOptimizer = true;
-            LogicalPlanValidationExecutor validator = new LogicalPlanValidationExecutor(
-                    plan, pigContext, isBeforeOptimizer);
-            validator.validate(plan, collector);
-
-            FunctionalLogicalOptimizer optimizer = new FunctionalLogicalOptimizer(
-                    plan);
-            optimizer.optimize();
-            
-            isBeforeOptimizer = false;
-            validator = new LogicalPlanValidationExecutor(
-                    plan, pigContext, isBeforeOptimizer);
-            validator.validate(plan, collector);
-        } catch (FrontendException fe) {
-            // Need to go through and see what the collector has in it. But
-            // remember what we've caught so we can wrap it into what we
-            // throw.
-            caught = fe;
+    }
+    
+    private void getLogToDataMap(Map<PhysicalOperator, DataBag> phyToDataMap) {
+        logToDataMap.clear();
+        for (LogicalOperator lo : logToPhyMap.keySet()) {
+            if (logToPhyMap.get(lo) != null)
+                logToDataMap.put(lo, phyToDataMap.get(logToPhyMap.get(lo)));
         }
+        
+        // set the LO-to-Data mapping for the ForEach inner plans
+        for (Map.Entry<LOForEach, Map<LogicalOperator, DataBag>> entry : forEachInnerLogToDataMap.entrySet()) {
+            entry.getValue().clear();
+            for (Map.Entry<LogicalOperator, PhysicalOperator>  innerEntry : forEachInnerLogToPhyMap.get(entry.getKey()).entrySet()) {
+                entry.getValue().put(innerEntry.getKey(), phyToDataMap.get(innerEntry.getValue()));
+            }
+        }
+    }
+    
+    private void setLoadDataMap() {
+        // This function sets up the LO-TO-Data map, eq. class, and lineage for the base data used in the coming runner
+        // this must be called after logToDataMap has been properly (re)set and before the runner is started
+        if (baseData != null) {
+            if (poToEqclassesMap == null)
+                poToEqclassesMap = new HashMap<PhysicalOperator, Collection<IdentityHashSet<Tuple>>>();
+            else
+                poToEqclassesMap.clear();
+            for (LOLoad lo : baseData.keySet()) {
+                logToDataMap.get(lo).addAll(baseData.get(lo));
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                equivalenceClasses.add(equivalenceClass);
+                for (Tuple t : baseData.get(lo)) {
+                    lineage.insert(t);
+                    equivalenceClass.add(t);
+                }
+                poToEqclassesMap.put(logToPhyMap.get(lo), equivalenceClasses);
+            }
+        }
+    }
+    
+    public Collection<IdentityHashSet<Tuple>> getEqClasses() throws VisitorException {
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> logToEqclassesMap = getLoToEqClassMap();
+        LinkedList<IdentityHashSet<Tuple>> ret = new LinkedList<IdentityHashSet<Tuple>>();
+        for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> entry :
+            logToEqclassesMap.entrySet()) {
+            if (entry.getValue() != null)
+                ret.addAll(entry.getValue());
+        }
+        return ret;
+    }
 
-        return plan;
-
+    public Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> getLoToEqClassMap() throws VisitorException {
+        Map<LogicalOperator, Collection<IdentityHashSet<Tuple>>> ret =
+          EquivalenceClasses.getLoToEqClassMap(physPlan, plan, logToPhyMap, logToDataMap, forEachInnerLogToPhyMap, poToEqclassesMap);
+        // eq classes adjustments based upon logical operators
+        
+        for (Map.Entry<LogicalOperator, Collection<IdentityHashSet<Tuple>>> entry :ret.entrySet())
+        {
+            if (entry.getKey() instanceof LOSort) {
+                Collection<IdentityHashSet<Tuple>> eqClasses = entry.getValue();
+                for (Iterator<IdentityHashSet<Tuple>> it = eqClasses.iterator(); it.hasNext(); ) {
+                    Object t = null;
+                    IdentityHashSet<Tuple> eqClass = it.next();
+                    if (eqClass.size() == 1) {
+                        eqClass.clear();
+                        continue;
+                    }
+                    boolean first = true, allIdentical = true;
+                    for (Iterator<Tuple> it1 = eqClass.iterator(); it1.hasNext();)
+                    {
+                        if (first) {
+                            first = false;
+                            t = it1.next();
+                        } else {
+                            if (!it1.next().equals(t)) {
+                                allIdentical = false;
+                                break;
+                            }
+                        }
+                    }
+                    if (allIdentical)
+                        eqClass.clear();
+                }
+            }
+        }
+        
+        return ret;
     }
 }

Added: pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/FakeRawKeyValueIterator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import org.apache.hadoop.mapred.RawKeyValueIterator;
+import org.apache.hadoop.io.DataInputBuffer;
+import org.apache.hadoop.util.Progress;
+
+public class FakeRawKeyValueIterator implements RawKeyValueIterator {
+    private boolean hasData;
+    
+    public FakeRawKeyValueIterator(boolean hasData) {
+        this.hasData = hasData;
+    }
+    
+    @Override
+    public DataInputBuffer getKey() {
+        return null; 
+    }
+    
+    @Override
+    public void close() {
+      
+    }
+    
+    @Override
+    public Progress getProgress() {
+        return null;
+    }
+    
+    @Override
+    public DataInputBuffer getValue() {
+        return null;
+    }
+    
+    @Override
+    public boolean next() {
+        return hasData;
+    }
+}

Added: pig/trunk/src/org/apache/pig/pen/Illustrable.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/Illustrable.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/Illustrable.java (added)
+++ pig/trunk/src/org/apache/pig/pen/Illustrable.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import org.apache.pig.data.Tuple;
+
+public interface Illustrable {
+    public void setIllustrator(Illustrator illustrator);
+    /**
+     * input tuple mark up to be illustrate-able
+     * @param in             input tuple
+     * @param out            output tuple before wrapped in ExampleTuple
+     * @param eqClassIndex   index into equivalence classes in illustrator
+     * 
+     * @return               tuple
+     */
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex);
+}

Added: pig/trunk/src/org/apache/pig/pen/Illustrator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/pen/Illustrator.java?rev=1045314&view=auto
==============================================================================
--- pig/trunk/src/org/apache/pig/pen/Illustrator.java (added)
+++ pig/trunk/src/org/apache/pig/pen/Illustrator.java Mon Dec 13 19:11:00 2010
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pig.pen;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
+
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
+
+import org.apache.pig.data.BagFactory;
+import org.apache.pig.data.DataBag;
+import org.apache.pig.data.Tuple;
+import org.apache.pig.impl.PigContext;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.impl.logicalLayer.schema.Schema;
+
+/**
+ * Class used by physical operators to generate example tuples for the ILLUSTRATE
+ * purpose
+ */
+
+public class Illustrator {
+
+    private LineageTracer lineage;
+    private LinkedList<IdentityHashSet<Tuple>> equivalenceClasses;
+    // all input tuples for an expression
+    private IdentityHashSet<Tuple> inputs = null;
+    
+    private DataBag data;
+    private int maxRecords = -1;
+    private int recCounter = 0;
+    private IllustratorAttacher attacher;
+    private ArrayList<Boolean[]> subExpResults;
+    private Boolean[] subExpResult;
+    private boolean eqClassesShared;
+    private long oriLimit = -1;
+    private Schema schema;
+
+    public Illustrator(LineageTracer lineage, LinkedList<IdentityHashSet<Tuple>> equivalenceClasses, IllustratorAttacher attacher, PigContext hadoopPigContext) {
+        this.lineage = lineage;
+        this.equivalenceClasses = equivalenceClasses;
+        data = BagFactory.getInstance().newDefaultBag();
+        this.attacher = attacher;
+        subExpResults = new ArrayList<Boolean[]>();
+        subExpResult = new Boolean[1];
+        schema = null;
+    }
+    
+    public Illustrator(LineageTracer lineage, LinkedList<IdentityHashSet<Tuple>> equivalenceClasses, int maxRecords, IllustratorAttacher attacher,
+        Schema schema, PigContext hadoopPigContext) {
+        this(lineage, equivalenceClasses, attacher, hadoopPigContext);
+        this.maxRecords = maxRecords;
+        this.schema = schema;
+    }
+    
+    public ArrayList<Boolean[]> getSubExpResults() {
+        return subExpResults;
+    }
+    
+    Boolean[] getSubExpResult() {
+        return subExpResult;
+    }
+    
+    public LineageTracer getLineage() {
+        return lineage;
+    }
+    
+    public LinkedList<IdentityHashSet<Tuple>> getEquivalenceClasses() {
+        return equivalenceClasses;
+    }
+    
+    public void setSubExpResult(boolean result) {
+        subExpResult[0] = result;
+    }
+    
+    public void setEquivalenceClasses(LinkedList<IdentityHashSet<Tuple>> eqClasses, PhysicalOperator po) {
+        equivalenceClasses = eqClasses;
+        attacher.poToEqclassesMap.put(po, eqClasses);
+    }
+    
+    public boolean ceilingCheck() {
+        if (maxRecords != -1 && ++recCounter > maxRecords)
+            return false;
+        else
+          return true;
+    }
+    
+    public IdentityHashSet<Tuple> getInputs() {
+        return inputs;      
+    }
+    
+    public void addInputs(IdentityHashSet<Tuple> inputs) {
+        if (this.inputs == null)
+          this.inputs = new IdentityHashSet<Tuple>();
+        this.inputs.addAll(inputs);
+    }
+    
+    public void addData(Tuple t) {
+      data.add(t);
+    }
+    
+    public DataBag getData() {
+      return data;
+    }
+    
+    public long getOriginalLimit() {
+        return oriLimit;
+    }
+    
+    public void setOriginalLimit(long oriLimit) {
+        this.oriLimit = oriLimit;
+    }
+    
+    public void setEqClassesShared() {
+        eqClassesShared = true;
+    }
+    
+    public boolean getEqClassesShared() {
+        return eqClassesShared;
+    }
+    
+    public Schema getSchema() {
+        return schema;
+    }
+}