You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC

svn commit: r1045314 [2/5] - in /pig/trunk: ./ src/org/apache/pig/ src/org/apache/pig/backend/hadoop/executionengine/ src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/ src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java Mon Dec 13 19:11:00 2010
@@ -72,8 +72,14 @@ public class PONot extends UnaryComparis
         if(res.returnStatus != POStatus.STATUS_OK || res.result == null) {
             return res;
         }
-        if (((Boolean)res.result).booleanValue()) return falseRes;
-        else return trueRes;
+        if (((Boolean)res.result).booleanValue()) {
+          illustratorMarkup(null, falseRes.result, 1);
+          return falseRes;
+        }
+        else {
+          illustratorMarkup(null, trueRes.result, 0);
+          return trueRes;
+        }
     }
 
     @Override

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Mon Dec 13 19:11:00 2010
@@ -78,9 +78,19 @@ public class POOr extends BinaryComparis
         // 3) f    t n f
         
         // Short circuit. if lhs is true, return true - ROW 1 above is handled with this
-        if (left.result != null && ((Boolean)left.result).booleanValue()) return left;
+        boolean returnLeft = false;
+        if (left.result != null && ((Boolean)left.result).booleanValue()) {
+          if (illustrator == null)
+              return left;
+          
+          illustratorMarkup(null, left.result, 0);
+          returnLeft = true;;
+        }
         
         Result right = rhs.getNext(dummyBool);
+        if (returnLeft)
+            return left;
+
         // pass on ERROR and EOP 
         if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
             return right;
@@ -94,6 +104,8 @@ public class POOr extends BinaryComparis
         
         // No matter what, what we get from the right side is what we'll
         // return, null, true, or false.
+        if (right.result != null)
+            illustratorMarkup(null, right.result, (Boolean) right.result ? 0 : 1);
         return right;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
@@ -38,6 +39,8 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
 
 /**
  * Implements the overloaded form of the project operator.
@@ -143,6 +146,7 @@ public class POProject extends Expressio
             return res;
         }
         if (star) {
+            illustratorMarkup(inpValue, res.result, -1);
             return res;
         } else if(columns.size() == 1) {
             try {
@@ -188,6 +192,7 @@ public class POProject extends Expressio
             ret = tupleFactory.newTuple(objList);
         }
         res.result = ret;
+        illustratorMarkup(inpValue, res.result, -1);
         return res;
     }
 
@@ -494,4 +499,11 @@ public class POProject extends Expressio
         return null;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java Mon Dec 13 19:11:00 2010
@@ -86,6 +86,7 @@ public class PORegexp extends BinaryComp
         } else {
             left.result = falseRef;
         }
+        illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
         return left;
     }    
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Mon Dec 13 19:11:00 2010
@@ -38,6 +38,7 @@ import org.apache.pig.impl.PigContext;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 //We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
@@ -84,6 +85,9 @@ public class POUserComparisonFunc extend
         // the two attached tuples are used up now. So we set the
         // inputAttached flag to false
         inputAttached = false;
+        if (result.returnStatus == POStatus.STATUS_OK)
+            illustratorMarkup(null, result.result, 
+                (Integer) result.result == 0 ? 0 : (Integer) result.result > 0 ? 1 : 2);
         return result;
 
     }
@@ -192,4 +196,14 @@ public class POUserComparisonFunc extend
         return null;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            illustrator.getInputs().add(t1);
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(t1);
+            illustrator.getInputs().add(t2);
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(t2);
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Dec 13 19:11:00 2010
@@ -480,4 +480,13 @@ public class POUserFunc extends Expressi
     public void setResultType(byte resultType) {
         this.resultType = resultType;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return (Tuple) out;
+    }
+    
+    public EvalFunc getFunc() {
+        return func;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java Mon Dec 13 19:11:00 2010
@@ -17,7 +17,9 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
 
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * This is a base class for all unary comparison operators. Supports the
@@ -49,4 +51,12 @@ public abstract class UnaryComparisonOpe
     public void setOperandType(byte operandType) {
         this.operandType = operandType;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon Dec 13 19:11:00 2010
@@ -130,18 +130,6 @@ public class PhyPlanVisitor extends Plan
 	public void visitDistinct(PODistinct distinct) throws VisitorException {
         //do nothing		
 	}
-	
-    public void visitPenCross(org.apache.pig.pen.physicalOperators.POCross cross) throws VisitorException {
-        //do nothing
-    }
-    
-    public void visitPenCogroup(org.apache.pig.pen.physicalOperators.POCogroup cogroup) throws VisitorException {
-        //do nothing
-    }
-    
-    public void visitPenSplit(org.apache.pig.pen.physicalOperators.POSplit split) throws VisitorException {
-        //do nothing
-    }
 
 	public void visitRead(PORead read) throws VisitorException {
         //do nothing		
@@ -297,24 +285,9 @@ public class PhyPlanVisitor extends Plan
 	}
 
     /**
-     * @param lrfi
-     * @throws VisitorException 
-     */
-    public void visitLocalRearrangeForIllustrate(
-            POLocalRearrangeForIllustrate lrfi) throws VisitorException {
-        List<PhysicalPlan> inpPlans = lrfi.getPlans();
-        for (PhysicalPlan plan : inpPlans) {
-            pushWalker(mCurrentWalker.spawnChildWalker(plan));
-            visit();
-            popWalker();
-        }
-        
-    }
-
-    /**
      * @param optimizedForEach
      */
-    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
+    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         // TODO Auto-generated method stub
         
     }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Iterator;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -37,6 +38,9 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * The collected group operator is a special operator used when users give
@@ -247,7 +251,7 @@ public class POCollectedGroup extends Ph
             outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag() 
                     : new InternalCachedBag(1);
             outputBag.add((Tuple)tup.get(1));
-
+            illustratorMarkup(null, tup2, 0);
             return res;
         }
 
@@ -298,4 +302,33 @@ public class POCollectedGroup extends Ph
             leafOps.add(leaf);
         }            
    }
+    
+    private void setIllustratorEquivalenceClasses(Tuple tin) {
+        if (illustrator != null) {
+          illustrator.getEquivalenceClasses().get(0).add(tin);
+        }
+    }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple(out);
+            LineageTracer lineage = illustrator.getLineage();
+            lineage.insert(tOut);
+            DataBag bag;
+            try {
+                bag = (DataBag) tOut.get(1);
+            } catch (ExecException e) {
+                throw new RuntimeException("Illustrator markup exception" + e.getMessage());
+            }
+            boolean synthetic = false;
+            while (!synthetic && bag.iterator().hasNext()) {
+                synthetic |= ((ExampleTuple) bag.iterator().next()).synthetic;
+            }
+            tOut.synthetic = synthetic;
+            illustrator.addData((Tuple) out);
+            return tOut;
+        } else
+          return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Mon Dec 13 19:11:00 2010
@@ -341,5 +341,11 @@ public class PODemux extends PhysicalOpe
     public boolean isInCombiner() {
         return inCombiner;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // nothing need to be done here
+        return null;
+    }
         
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Mon Dec 13 19:11:00 2010
@@ -19,7 +19,9 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
+import java.util.HashSet;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -32,12 +34,17 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.data.BagFactory;
 import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.DistinctDataBag;
 import org.apache.pig.data.InternalDistinctBag;
 import org.apache.pig.data.InternalSortedBag;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * Find the distinct set of tuples in a bag.
@@ -105,6 +112,7 @@ public class PODistinct extends Physical
                     continue;
                 }
                 distinctBag.add((Tuple) in.result);
+                illustratorMarkup(in.result, in.result, 0);
                 in = processInput();
             }
             inputsAccumulated = true;
@@ -159,4 +167,12 @@ public class PODistinct extends Physical
         return new PODistinct(new OperatorKey(this.mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), this.requestedParallelism, this.inputs);
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) out);
+            illustrator.addData((Tuple) out);
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Mon Dec 13 19:11:00 2010
@@ -445,4 +445,10 @@ public class POFRJoin extends PhysicalOp
     public void setReplFiles(FileSpec[] replFiles) {
         this.replFiles = replFiles;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        // no op: all handled by the preceding POForEach
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Mon Dec 13 19:11:00 2010
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
+import java.util.LinkedList;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
@@ -25,12 +26,13 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This is an implementation of the Filter operator. It has an Expression Plan
@@ -151,13 +153,10 @@ public class POFilter extends PhysicalOp
                     && res.returnStatus != POStatus.STATUS_NULL) 
                 return res;
 
-            if (res.result != null && (Boolean) res.result == true) {
-        	if(lineageTracer != null) {
-        	    ExampleTuple tIn = (ExampleTuple) inp.result;
-        	    lineageTracer.insert(tIn);
-        	    lineageTracer.union(tIn, tIn);
-        	}
-                return inp;
+            if (res.result != null) {
+                illustratorMarkup(inp.result, inp.result, (Boolean) res.result ? 0 : 1);
+                if ((Boolean) res.result)
+                  return inp;
             }
         }
         return inp;
@@ -194,4 +193,20 @@ public class POFilter extends PhysicalOp
     public PhysicalPlan getPlan() {
         return plan;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+      if (illustrator != null) {
+          int index = 0;
+          for (int i = 0; i < illustrator.getSubExpResults().size(); ++i) {
+              if (!illustrator.getSubExpResults().get(i)[0])
+                  index += (1 << i);
+          }
+          if (index < illustrator.getEquivalenceClasses().size())
+              illustrator.getEquivalenceClasses().get(index).add((Tuple) in);
+          if (eqClassIndex == 0) // add only qualified record
+              illustrator.addData((Tuple) out);
+      }
+      return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon Dec 13 19:11:00 2010
@@ -47,7 +47,9 @@ import org.apache.pig.impl.plan.Dependen
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 //We intentionally skip type checking in backend for performance reasons
 @SuppressWarnings("unchecked")
@@ -93,6 +95,8 @@ public class POForEach extends PhysicalO
     
     protected transient AccumulativeTupleBuffer buffer;
     
+    protected Tuple inpTuple;
+    
     public POForEach(OperatorKey k) {
         this(k,-1,null,null);
     }
@@ -209,13 +213,6 @@ public class POForEach extends PhysicalO
                 res = processPlan();               
                 
                 if(res.returnStatus==POStatus.STATUS_OK) {
-                    if(lineageTracer !=  null && res.result != null) {
-                    ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-                    tOut.synthetic = tIn.synthetic;
-                    lineageTracer.insert(tOut);
-                    lineageTracer.union(tOut, tIn);
-                    res.result = tOut;
-                    }
                     return res;
                 }
                 if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -247,18 +244,18 @@ public class POForEach extends PhysicalO
             }
                        
             attachInputToPlans((Tuple) inp.result);
-            Tuple tuple = (Tuple)inp.result;
+            inpTuple = (Tuple)inp.result;
             
             for (PhysicalOperator po : opsToBeReset) {
                 po.reset();
             }
             
             if (isAccumulative()) {            	
-                for(int i=0; i<tuple.size(); i++) {            		
-                    if (tuple.getType(i) == DataType.BAG) {
+                for(int i=0; i<inpTuple.size(); i++) {            		
+                    if (inpTuple.getType(i) == DataType.BAG) {
                         // we only need to check one bag, because all the bags
                         // share the same buffer
-                        buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+                        buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
                         break;
                     }
                 }
@@ -272,8 +269,9 @@ public class POForEach extends PhysicalO
                         }
                         
                         setAccumStart();                		
-                    }else{                
-                        buffer.clear();
+                    }else{
+                        inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+ //                       buffer.clear();
                         setAccumEnd();                		
                     }
                     
@@ -292,16 +290,6 @@ public class POForEach extends PhysicalO
             }
             
             processingPlan = true;
-
-            if(lineageTracer != null && res.result != null) {
-            //we check for res.result since that can also be null in the case of flatten
-            tIn = (ExampleTuple) inp.result;
-            ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-            tOut.synthetic = tIn.synthetic;
-            lineageTracer.insert(tOut);
-            lineageTracer.union(tOut, tIn);
-            res.result = tOut;
-            }
             
             return res;
         }
@@ -482,12 +470,10 @@ public class POForEach extends PhysicalO
             } else
                 out.append(in);
         }
-        
-        if(lineageTracer != null) {
-            ExampleTuple tOut = new ExampleTuple();
-            tOut.reference(out);
-        }
-        return out;
+        if (inpTuple != null)
+            return illustratorMarkup(inpTuple, out, 0);
+        else
+            return illustratorMarkup2(data, out);
     }
 
     
@@ -699,4 +685,43 @@ public class POForEach extends PhysicalO
     public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
         this.opsToBeReset = opsToBeReset;
     }
+    
+    private Tuple illustratorMarkup2(Object[] in, Object out) {
+      if(illustrator != null) {
+          ExampleTuple tOut = new ExampleTuple((Tuple) out);
+          illustrator.getLineage().insert(tOut);
+          boolean synthetic = false;
+          for (Object tIn : in)
+          {
+              synthetic |= ((ExampleTuple) tIn).synthetic;
+              illustrator.getLineage().union(tOut, (Tuple) tIn);
+          }
+          illustrator.addData(tOut);
+          int i;
+          for (i = 0; i < noItems; ++i)
+              if (((DataBag)bags[i]).size() < 2)
+                  break;
+          if (i >= noItems && !illustrator.getEqClassesShared())
+              illustrator.getEquivalenceClasses().get(0).add(tOut);
+          tOut.synthetic = synthetic;
+          return tOut;
+      } else
+          return (Tuple) out;
+    }
+
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            illustrator.addData(tOut);
+            if (!illustrator.getEqClassesShared())
+                illustrator.getEquivalenceClasses().get(0).add(tOut);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            tOut.synthetic = ((ExampleTuple) in).synthetic;
+            lineageTracer.union((ExampleTuple) in , tOut);
+            return tOut;
+        } else
+          return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java Mon Dec 13 19:11:00 2010
@@ -105,4 +105,9 @@ public class POGlobalRearrange extends P
         // TODO Auto-generated method stub
         return super.getNext(t);
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+      return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Mon Dec 13 19:11:00 2010
@@ -17,6 +17,7 @@
  */
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -32,6 +33,9 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 public class POLimit extends PhysicalOperator {
 	   /**
@@ -87,7 +91,9 @@ public class POLimit extends PhysicalOpe
                     || inp.returnStatus == POStatus.STATUS_ERR)
                 break;
             
-            if (soFar>=mLimit)
+            illustratorMarkup(inp.result, null, 0);
+            // illustrator ignore LIMIT before the post processing
+            if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
             	inp.returnStatus = POStatus.STATUS_EOP;
             
             soFar++;
@@ -131,4 +137,14 @@ public class POLimit extends PhysicalOpe
         newLimit.setAlias(alias);
         return newLimit;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tIn = (ExampleTuple) in;
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+            illustrator.addData((Tuple) in);
+        }
+        return (Tuple) in;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Mon Dec 13 19:11:00 2010
@@ -135,10 +135,9 @@ public class POLoad extends PhysicalOper
             }
             else
                 res.returnStatus = POStatus.STATUS_OK;
-            if(lineageTracer != null) {
-        	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-        	res.result = tOut;
-            }
+
+            if (res.returnStatus == POStatus.STATUS_OK)
+                res.result = illustratorMarkup(res, res.result, 0);
         } catch (IOException e) {
             log.error("Received error from loader function: " + e);
             return res;
@@ -199,4 +198,37 @@ public class POLoad extends PhysicalOper
     public LoadFunc getLoadFunc(){
         return this.loader;
     }
+    
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+          if (!illustrator.ceilingCheck()) {
+              ((Result) in).returnStatus = POStatus.STATUS_EOP;
+              return null;
+          }
+          if (illustrator.getSchema() == null || illustrator.getSchema().size() <= ((Tuple) out).size()) {
+              boolean hasNull = false;
+              for (int i = 0; i < ((Tuple) out).size(); ++i)
+                  try {
+                      if (((Tuple) out).get(i) == null)
+                      {
+                          hasNull = true;
+                          break;
+                      }
+                  } catch (ExecException e) {
+                      hasNull = true;
+                      break;
+                  }
+              if (!hasNull) {
+                  ExampleTuple tOut = new ExampleTuple((Tuple) out);
+                  illustrator.getLineage().insert(tOut);
+                  illustrator.addData((Tuple) tOut);
+                  illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+                  return tOut;
+              } else
+                  return (Tuple) out;
+          } else
+              return (Tuple) out;
+        } else
+          return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Mon Dec 13 19:11:00 2010
@@ -41,6 +41,7 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.pen.util.ExampleTuple;
 
 /**
  * The local rearrange operator is a part of the co-group
@@ -380,6 +381,7 @@ public class POLocalRearrange extends Ph
             if(secondaryPlans != null)
                 detachPlans(secondaryPlans);
             
+            res.result = illustratorMarkup(inp.result, res.result, 0);
             return res;
         }
         return inp;
@@ -445,7 +447,10 @@ public class POLocalRearrange extends Ph
             //Put the key and the indexed tuple
             //in a tuple and return
             lrOutput.set(1, key);
-            lrOutput.set(2, mFakeTuple);
+            if (illustrator != null)
+                lrOutput.set(2, key);
+            else
+                lrOutput.set(2, mFakeTuple);
             return lrOutput;
         } else if(isCross){
         
@@ -486,6 +491,7 @@ public class POLocalRearrange extends Ph
                             minimalValue.append(value.get(i));
                         }
                     }
+                    minimalValue = illustratorMarkup(value, minimalValue, -1);
                 } else {
                     // for the project star case
                     // we would send out an empty tuple as
@@ -799,4 +805,19 @@ public class POLocalRearrange extends Ph
         this.stripKeyFromValue = stripKeyFromValue;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if (illustrator != null) {
+            if (!(out instanceof ExampleTuple))
+            {
+                ExampleTuple tOut = new ExampleTuple((Tuple) out);
+                illustrator.getLineage().insert(tOut);
+                illustrator.addData(tOut);
+                illustrator.getLineage().union(tOut, (Tuple) in);
+                tOut.synthetic = ((ExampleTuple) in).synthetic;
+                return tOut;
+            }
+        }
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,8 @@ import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
 import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.PriorityQueue;
 import java.util.Properties;
@@ -53,7 +55,10 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 public class POMergeCogroup extends PhysicalOperator {
 
@@ -279,7 +284,7 @@ public class POMergeCogroup extends Phys
         for(int i=0; i < relationCnt; i++)
             out.set(i+1,(outBags[i]));
 
-        return new Result(POStatus.STATUS_OK, out);        
+        return new Result(POStatus.STATUS_OK, illustratorMarkup(null, out, -1));        
     }
 
 
@@ -592,4 +597,38 @@ public class POMergeCogroup extends Phys
             System.out.println(i+++"th item in heap: "+ copy.poll());
         }
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert((Tuple) out);
+            Tuple tmp;
+            boolean synthetic = false;
+            try {
+                for (int i = 1; i < relationCnt; i++)
+                {
+                    DataBag dbs = (DataBag) ((Tuple) out).get(i);
+                    Iterator<Tuple> iter = dbs.iterator();
+                    while (iter.hasNext()) {
+                        tmp = iter.next();
+                        // any of synthetic data in bags causes the output tuple to be synthetic
+                        if (!synthetic && ((ExampleTuple)tmp).synthetic)
+                            synthetic = true;
+                        lineageTracer.union(tOut, tmp);
+                        // TODO constraint of >=2 tuples per eq. class
+                        illustrator.getEquivalenceClasses().get(i-1).add(tmp);
+                    }
+                }
+            } catch (ExecException e) {
+              // TODO better exception handling
+              throw new RuntimeException("Illustrator exception :"+e.getMessage());
+            }
+            tOut.synthetic = synthetic;
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
 import java.io.IOException;
 import java.io.ObjectInputStream;
 import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -37,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.data.TupleFactory;
@@ -47,7 +50,10 @@ import org.apache.pig.impl.plan.NodeIdGe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.PlanException;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /** This operator implements merge join algorithm to do map side joins. 
  *  Currently, only two-way joins are supported. One input of join is identified as left
@@ -204,6 +210,7 @@ public class POMergeJoin extends Physica
                 for(int i=0; i < rightTupSize; i++)
                     joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
 
+                joinedTup = illustratorMarkup(null, joinedTup, 0);
                 return new Result(POStatus.STATUS_OK, joinedTup);
             }
             // Join with current right input has ended. But bag of left tuples
@@ -555,4 +562,28 @@ public class POMergeJoin extends Physica
     public String getIndexFile() {
         return indexFile;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            tOut.synthetic = ((ExampleTuple) out).synthetic;
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            try {
+                for (int i = 0; i < leftTupSize+rightTupSize; i++)
+                {
+                    lineageTracer.union(tOut,  (Tuple) tOut.get(i));
+                    illustrator.getEquivalenceClasses().get(i).add((Tuple)tOut);
+                    // TODO constraint of >=2 tuples per eq. class
+                }
+            } catch (ExecException e) {
+              // TODO better exception handling
+              throw new RuntimeException("Illustrator exception :"+e.getMessage());
+            }
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        }
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Mon Dec 13 19:11:00 2010
@@ -270,7 +270,7 @@ public class POMultiQueryPackage extends
             myObj.setIndex(origIndex);
             tuple.set(0, myObj);
         }
-        
+        // illustrator markup has been handled by "pack"
         return res;
     }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.util.Utils;
+import org.apache.pig.data.Tuple;
 
 public class PONative extends PhysicalOperator {
     
@@ -72,4 +73,8 @@ public class PONative extends PhysicalOp
         return false;
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.LinkedList;
 
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.DataType;
@@ -31,6 +32,8 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
 
 /**
  * A specialized version of POForeach with the difference
@@ -97,13 +100,6 @@ public class POOptimizedForEach extends 
             while(true) {
                 res = processPlan();
                 if(res.returnStatus==POStatus.STATUS_OK) {
-                    if(lineageTracer !=  null && res.result != null) {
-                	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-                	tOut.synthetic = tIn.synthetic;
-                	lineageTracer.insert(tOut);
-                	lineageTracer.union(tOut, tIn);
-                	res.result = tOut;
-                    }
                     return res;
                 }
                 if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -132,16 +128,6 @@ public class POOptimizedForEach extends 
             res = processPlan();
             
             processingPlan = true;
-
-            if(lineageTracer != null && res.result != null) {
-        	//we check for res.result since that can also be null in the case of flatten
-        	tIn = (ExampleTuple) input;
-        	ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
-        	tOut.synthetic = tIn.synthetic;
-        	lineageTracer.insert(tOut);
-        	lineageTracer.union(tOut, tIn);
-        	res.result = tOut;
-            }
             
             return res;
         }
@@ -171,6 +157,4 @@ public class POOptimizedForEach extends 
             NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
             requestedParallelism, plans, flattens);
     }
-
-    
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Dec 13 19:11:00 2010
@@ -21,11 +21,10 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.data.AccumulativeBag;
 import org.apache.pig.data.BagFactory;
@@ -44,7 +43,11 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
 /**
  * The package operator that packages
  * the globally rearranged tuples into
@@ -112,8 +115,6 @@ public class POPackage extends PhysicalO
     // "value" to column numbers in the "key" which contain the fields in
     // the "value"
     protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
-    
-    transient private final Log log = LogFactory.getLog(getClass());
 
     protected static final BagFactory mBagFactory = BagFactory.getInstance();
     protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -304,10 +305,13 @@ public class POPackage extends PhysicalO
                 res.set(i+1,bag);
             }
         }
-        detachInput();
         Result r = new Result();
-        r.result = res;
         r.returnStatus = POStatus.STATUS_OK;
+        if (!isAccumulative())
+            r.result = illustratorMarkup(null, res, 0);
+        else
+            r.result = res;
+        detachInput();
         return r;
     }
 
@@ -355,19 +359,19 @@ public class POPackage extends PhysicalO
                     }
                 }
             }
-            
+            copy = illustratorMarkup2(val, copy);
         } else if (isProjectStar) {
             
             // the whole "value" is present in the "key"
             copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-            
+            copy = illustratorMarkup2(keyAsTuple, copy);
         } else {
             
             // there is no field of the "value" in the
             // "key" - so just make a copy of what we got
             // as the "value"
             copy = mTupleFactory.newTuple(val.getAll());
-            
+            copy = illustratorMarkup2(val, copy);
         }
         return copy;
     }
@@ -462,7 +466,7 @@ public class POPackage extends PhysicalO
         return pkgType;
     }
     
-    private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+    class POPackageTupleBuffer implements AccumulativeTupleBuffer {
         private List<Tuple>[] bags;
         private Iterator<NullableTuple> iter;
         private int batchSize;
@@ -529,5 +533,79 @@ public class POPackage extends PhysicalO
         public Iterator<Tuple> getTuples(int index) {			
             return bags[index].iterator();
         }
+        
+        public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+            return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
+        }
        };
+       
+       private Tuple illustratorMarkup2(Object in, Object out) {
+           if(illustrator != null) {
+               ExampleTuple tOut = new ExampleTuple((Tuple) out);
+               illustrator.getLineage().insert(tOut);
+               tOut.synthetic = ((ExampleTuple) in).synthetic;
+               illustrator.getLineage().union(tOut, (Tuple) in);
+               return tOut;
+           } else
+               return (Tuple) out;
+       }
+       
+       @Override
+       public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+           if(illustrator != null) {
+               ExampleTuple tOut = new ExampleTuple((Tuple) out);
+               LineageTracer lineageTracer = illustrator.getLineage();
+               lineageTracer.insert(tOut);
+               Tuple tmp;
+               boolean synthetic = false;
+               if (illustrator.getEquivalenceClasses() == null) {
+                   LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                   for (int i = 0; i < numInputs; ++i) {
+                       IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                       equivalenceClasses.add(equivalenceClass);
+                   }
+                   illustrator.setEquivalenceClasses(equivalenceClasses, this);
+               }
+
+               if (distinct) {
+                   int count;
+                   for (count = 0; tupIter.hasNext(); ++count) {
+                       NullableTuple ntp = tupIter.next();
+                       tmp = (Tuple) ntp.getValueAsPigType();
+                       if (!tmp.equals(tOut))
+                           lineageTracer.union(tOut, tmp);
+                   }
+                   if (count > 1) // only non-distinct tuples are inserted into the equivalence class
+                       illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+                   illustrator.addData((Tuple) tOut);
+                   return (Tuple) tOut;
+               }
+               boolean outInEqClass = true;
+               try {
+                   for (int i = 1; i < numInputs+1; i++)
+                   {
+                       DataBag dbs = (DataBag) ((Tuple) out).get(i);
+                       Iterator<Tuple> iter = dbs.iterator();
+                       if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
+                           outInEqClass = false;
+                       while (iter.hasNext()) {
+                           tmp = iter.next();
+                           // any of synthetic data in bags causes the output tuple to be synthetic
+                           if (!synthetic && ((ExampleTuple)tmp).synthetic)
+                               synthetic = true;
+                           lineageTracer.union(tOut, tmp);
+                       }
+                   }
+               } catch (ExecException e) {
+                 // TODO better exception handling
+                 throw new RuntimeException("Illustrator exception :"+e.getMessage());
+               }
+               if (outInEqClass)
+                   illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+               tOut.synthetic = synthetic;
+               illustrator.addData((Tuple) tOut);
+               return tOut;
+           } else
+               return (Tuple) out;
+       }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Mon Dec 13 19:11:00 2010
@@ -22,10 +22,13 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Map.Entry;
+
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.impl.util.Pair;
 
 import org.apache.pig.PigException;
@@ -42,6 +45,8 @@ import org.apache.pig.impl.io.NullableTu
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This package operator is a specialization
@@ -173,8 +178,8 @@ public class POPackageLite extends POPac
         res.set(1,db);
         detachInput();
         Result r = new Result();
-        r.result = res;
         r.returnStatus = POStatus.STATUS_OK;
+        r.result = illustratorMarkup(null, res, 0);
         return r;
     }
     
@@ -202,5 +207,27 @@ public class POPackageLite extends POPac
                 + DataType.findTypeName(keyType) + "}" + " - "
                 + mKey.toString();
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tOut = new ExampleTuple((Tuple) out);
+            LineageTracer lineageTracer = illustrator.getLineage();
+            lineageTracer.insert(tOut);
+            if (illustrator.getEquivalenceClasses() == null) {
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < numInputs; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, this);
+            }
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+            tOut.synthetic = false;  // not expect this to be really used
+            illustrator.addData((Tuple) tOut);
+            return tOut;
+        } else
+            return (Tuple) out;
+    }
 }
 

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Mon Dec 13 19:11:00 2010
@@ -234,4 +234,8 @@ public class POPreCombinerLocalRearrange
         }            
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java Mon Dec 13 19:11:00 2010
@@ -103,4 +103,8 @@ public class PORead extends PhysicalOper
         v.visitRead(this);
     }
 
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Mon Dec 13 19:11:00 2010
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
 import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
@@ -121,4 +122,8 @@ public class POSkewedJoin extends Physic
 		return inputSchema.get(i);
 	}
 
+	@Override
+  public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+	    return null;
+	}
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Mon Dec 13 19:11:00 2010
@@ -21,6 +21,7 @@ import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -47,6 +48,8 @@ import org.apache.pig.data.TupleFactory;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.NodeIdGenerator;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * This implementation is applicable for both the physical plan and for the
@@ -301,10 +304,7 @@ public class POSort extends PhysicalOper
         }
         if (it.hasNext()) {
             res.result = it.next();
-            if(lineageTracer != null) {
-                lineageTracer.insert((Tuple) res.result);
-                lineageTracer.union((Tuple)res.result, (Tuple)res.result);
-            }
+            illustratorMarkup(res.result, res.result, 0);
             res.returnStatus = POStatus.STATUS_OK;
         } else {
             res.returnStatus = POStatus.STATUS_EOP;
@@ -407,4 +407,12 @@ public class POSort extends PhysicalOper
     public SortInfo getSortInfo() {
         return sortInfo;
     }
+    
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+          illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) in);
+            illustrator.addData((Tuple) out);
+        }
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.util.ArrayList;
 import java.util.BitSet;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.commons.logging.Log;
@@ -33,6 +34,9 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * The MapReduce Split operator.
@@ -314,5 +318,11 @@ public class POSplit extends PhysicalOpe
         return res;
                 
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+      // no op  
+      return null;
+    }
         
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.List;
+import java.util.LinkedList;
 
 import org.apache.hadoop.mapreduce.Counter;
 import org.apache.pig.PigException;
@@ -36,6 +37,9 @@ import org.apache.pig.impl.io.FileSpec;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 
 /**
  * The store operator which is used in two ways:
@@ -132,8 +136,12 @@ public class POStore extends PhysicalOpe
         try {
             switch (res.returnStatus) {
             case POStatus.STATUS_OK:
-                storer.putNext((Tuple)res.result);
+                if (illustrator == null) {
+                    storer.putNext((Tuple)res.result);
+                } else
+                    illustratorMarkup(res.result, res.result, 0);
                 res = empty;
+
                 if (outputRecordCounter != null) {
                     outputRecordCounter.increment(1);
                 }
@@ -250,4 +258,17 @@ public class POStore extends PhysicalOpe
     public boolean isMultiStore() {
         return isMultiStore;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            ExampleTuple tIn = (ExampleTuple) in;
+            LineageTracer lineage = illustrator.getLineage();
+            lineage.insert(tIn);
+            if (!isTmpStore)
+                illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+            illustrator.addData((Tuple) out);
+        }
+        return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
 
 import java.io.IOException;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Properties;
 import java.util.concurrent.ArrayBlockingQueue;
@@ -31,6 +32,9 @@ import org.apache.pig.impl.plan.Operator
 import org.apache.pig.impl.plan.VisitorException;
 import org.apache.pig.impl.streaming.ExecutableManager;
 import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -158,7 +162,8 @@ public class POStream extends PhysicalOp
                     // we don't need to set any flag noting we saw all output
                     // from binary
                     r = EOP_RESULT;
-                }
+                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    illustratorMarkup(r.result, r.result, 0);
                 return(r);
             }
             
@@ -207,7 +212,8 @@ public class POStream extends PhysicalOp
                     // should never be called. So we don't need to set any 
                     // flag noting we saw all output from binary
                     r = EOP_RESULT;
-                }
+                } else if (r.returnStatus == POStatus.STATUS_OK)
+                  illustratorMarkup(r.result, r.result, 0);
                 return r;
             } else {
                 // we are not being called from close() - so
@@ -222,7 +228,8 @@ public class POStream extends PhysicalOp
                     // for future calls
                     r = EOP_RESULT;
                     allOutputFromBinaryProcessed  = true;
-                }
+                } else if (r.returnStatus == POStatus.STATUS_OK)
+                    illustratorMarkup(r.result, r.result, 0);
                 return r;
             }
             
@@ -348,4 +355,14 @@ public class POStream extends PhysicalOp
     public BlockingQueue<Result> getBinaryOutputQueue() {
         return binaryOutputQueue;
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+      if(illustrator != null) {
+          ExampleTuple tIn = (ExampleTuple) in;
+          illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+            illustrator.addData((Tuple) out);
+      }
+      return (Tuple) out;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Mon Dec 13 19:11:00 2010
@@ -18,6 +18,7 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.BitSet;
+import java.util.LinkedList;
 import java.util.List;
 
 import org.apache.pig.backend.executionengine.ExecException;
@@ -29,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
 import org.apache.pig.pen.util.ExampleTuple;
 
 /**
@@ -165,6 +167,7 @@ public class POUnion extends PhysicalOpe
 
                     if(res.returnStatus == POStatus.STATUS_OK || 
                             res.returnStatus == POStatus.STATUS_NULL || res.returnStatus == POStatus.STATUS_ERR) {
+                        illustratorMarkup(res.result, res.result, ind);
                         return res;
                     }
 
@@ -181,14 +184,29 @@ public class POUnion extends PhysicalOpe
             res.returnStatus = POStatus.STATUS_OK;
             detachInput();
             nextReturnEOP = true ;
-            if(lineageTracer != null) {
-        	ExampleTuple tOut = (ExampleTuple) res.result;
-        	lineageTracer.insert(tOut);
-        	lineageTracer.union(tOut, tOut);
-            }
+            illustratorMarkup(res.result, res.result, 0);
             return res;
         }
 
 
     }
+    
+    @Override
+    public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+        if(illustrator != null) {
+            if (illustrator.getEquivalenceClasses() == null) {
+                int size = (inputs == null ? 1 : inputs.size());
+                LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+                for (int i = 0; i < size; ++i) {
+                    IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+                    equivalenceClasses.add(equivalenceClass);
+                }
+                illustrator.setEquivalenceClasses(equivalenceClasses, this);
+            }
+            ExampleTuple tIn = (ExampleTuple) in;
+            illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+            illustrator.addData((Tuple) out);
+        }
+        return null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/AccumulativeBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/AccumulativeBag.java Mon Dec 13 19:11:00 2010
@@ -68,7 +68,9 @@ public class AccumulativeBag implements 
     }
 
     public long size() {		
-        throw new RuntimeException("AccumulativeBag does not support size() operation");
+        int size = 0;
+        for (Iterator<Tuple> it = iterator(); it.hasNext(); it.next(), ++size);
+        return size;
     }
 
     public long getMemorySize() {	

Modified: pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Dec 13 19:11:00 2010
@@ -75,6 +75,24 @@ public abstract class TupleFactory {
                     throw new RuntimeException("Unable to instantiate "
                         + "tuple factory " + factoryName, e);
                 }
+            } else if (factoryName != null) {
+                try {
+                    Class c = Class.forName(factoryName);
+                    Object o = c.newInstance();
+                    if (!(o instanceof TupleFactory)) {
+                        throw new RuntimeException("Provided factory " +
+                            factoryName + " does not extend TupleFactory!");
+                    }
+                    gSelf = (TupleFactory)o;
+                } catch (Exception e) {
+                    if (e instanceof RuntimeException) {
+                      // We just threw this
+                      RuntimeException re = (RuntimeException)e;
+                      throw re;
+                    }
+                    throw new RuntimeException("Unable to instantiate "
+                        + "tuple factory " + factoryName, e);
+                }
             } else {
                 gSelf = new BinSedesTupleFactory();
             }

Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Dec 13 19:11:00 2010
@@ -115,11 +115,14 @@ public class PigContext implements Seria
     
     public int defaultParallel = -1;
 
-    // Says, wether we're processing an explain right now. Explain
+    // Says, whether we're processing an explain right now. Explain
     // might skip some check in the logical plan validation (file
     // existence checks, etc).
     public boolean inExplain = false;
     
+    // whether we're processing an ILLUSTRATE right now.
+    public boolean inIllustrator = false;
+    
     private String last_alias = null;
 
     // List of paths skipped for automatic shipping

Modified: pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Mon Dec 13 19:11:00 2010
@@ -18,16 +18,17 @@
 package org.apache.pig.impl.builtin;
 
 import java.io.IOException;
+import java.util.Map;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.pig.EvalFunc;
 import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
 import org.apache.pig.data.DataType;
 import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.io.InterStorage;
 import org.apache.pig.impl.io.ReadToEndLoader;
 import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.data.DataBag;
 
 /**
  * ReadScalars reads a line from a file and returns it as its value. The
@@ -39,6 +40,9 @@ public class ReadScalars extends EvalFun
     private String scalarfilename = null;
   //  private String charset = "UTF-8";
     private Object value = null;
+    
+    // in-core input : used by illustrator
+    private Map<String, DataBag> inputBuffer = null;
 
     /**
      * Java level API
@@ -54,6 +58,24 @@ public class ReadScalars extends EvalFun
                 return null;
 
             int pos;
+            if (inputBuffer != null)
+            {
+                pos = DataType.toInteger(input.get(0));
+                scalarfilename = DataType.toString(input.get(1));
+                DataBag inputBag = inputBuffer.get(scalarfilename);
+                if (inputBag == null || inputBag.size() ==0)
+                {
+                    log.warn("No scalar field to read, returning null");
+                    return null;
+                } else if (inputBag.size() > 1) {
+                    String msg = "Scalar has more than one row in the output.";
+                    throw new ExecException(msg);
+                }
+                Tuple t1 = inputBag.iterator().next();
+                value = t1.get(pos);
+                return value;
+            }
+            
             ReadToEndLoader loader;
             try {
                 pos = DataType.toInteger(input.get(0));
@@ -92,4 +114,8 @@ public class ReadScalars extends EvalFun
         return value;
     }
 
+    public void setOutputBuffer(Map<String, DataBag> inputBuffer) {
+        this.inputBuffer = inputBuffer;
+        value = null;
+    }
 }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Mon Dec 13 19:11:00 2010
@@ -105,6 +105,10 @@ public class ForeachInnerPlanVisitor ext
         return childPlanVisitor.exprPlan;
     }
     
+    public Map<LogicalOperator, LogicalRelationalOperator> getInnerOpMap() {
+        return innerOpsMap;
+    }
+    
     public void visit(LOProject project) throws VisitorException {
         LogicalOperator op = project.getExpression();
         

Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Mon Dec 13 19:11:00 2010
@@ -60,13 +60,23 @@ import org.apache.pig.newplan.logical.re
 public class LogicalPlanMigrationVistor extends LOVisitor { 
     private org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan;
     private Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+    private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerMap;
    
     public LogicalPlanMigrationVistor(LogicalPlan plan) {
         super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
         logicalPlan = new org.apache.pig.newplan.logical.relational.LogicalPlan();
         opsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+        forEachInnerMap = new HashMap<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>>();
     }    
     
+    public Map<LogicalOperator, LogicalRelationalOperator> getOldToNewLOOpMap() {
+        return opsMap;
+    }
+    
+    public Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> getForEachInnerMap() {
+        return forEachInnerMap;
+    }
+    
     private void translateConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) {       
         List<LogicalOperator> preds = mPlan.getPredecessors(oldOp); 
         
@@ -227,12 +237,14 @@ public class LogicalPlanMigrationVistor 
         innerPlan.add(gen);                
         
         List<LogicalPlan> ll = forEach.getForEachPlans();
+        Map<LogicalOperator, LogicalRelationalOperator> innerMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+        forEachInnerMap.put(forEach, innerMap);
         try {
             for(int i=0; i<ll.size(); i++) {
                 LogicalPlan lp = ll.get(i);
                 ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan, opsMap);
                 v.visit();
-                
+                innerMap.putAll(v.getInnerOpMap());
                 expPlans.add(v.exprPlan);
             }
         } catch (FrontendException e) {

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Mon Dec 13 19:11:00 2010
@@ -267,6 +267,10 @@ public class LOCogroup extends LogicalRe
         return mGroupType;
     }
     
+    public void resetGroupType() {
+        mGroupType = GROUPTYPE.REGULAR;
+    }
+    
     /**
      * Returns an Unmodifiable Map of Input Number to Uid 
      * @return Unmodifiable Map<Integer,Long>

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Mon Dec 13 19:11:00 2010
@@ -82,6 +82,10 @@ public class LOJoin extends LogicalRelat
         return mJoinType;
     }
     
+    public void resetJoinType() {
+        mJoinType = JOINTYPE.HASH;
+    }
+    
     public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
         return mJoinPlans.get(inputIndex);
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon Dec 13 19:11:00 2010
@@ -105,6 +105,10 @@ public class LogToPhyTranslationVisitor 
         this.pc = pc;
     }
 
+    public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+        return logToPhyMap;
+    }
+    
     public PhysicalPlan getPhysicalPlan() {
         return currentPlan;
     }

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Mon Dec 13 19:11:00 2010
@@ -28,7 +28,6 @@ import java.util.Set;
 import org.apache.pig.newplan.Operator;
 import org.apache.pig.newplan.OperatorPlan;
 import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
 import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
 import org.apache.pig.newplan.logical.expression.ProjectExpression;
 import org.apache.pig.newplan.logical.relational.LOForEach;

Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java Mon Dec 13 19:11:00 2010
@@ -119,6 +119,8 @@ public abstract class TypeCastInserter e
             foreach.setAlias(op.getAlias());
             
             // Insert the foreach into the plan and patch up the plan.
+            if (currentPlan.getSuccessors(op) == null)
+                return;
             Operator next = currentPlan.getSuccessors(op).get(0);
             currentPlan.insertBetween(op, foreach, next);