You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ya...@apache.org on 2010/12/13 20:11:04 UTC
svn commit: r1045314 [2/5] - in /pig/trunk: ./ src/org/apache/pig/
src/org/apache/pig/backend/hadoop/executionengine/
src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/ src...
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PONot.java Mon Dec 13 19:11:00 2010
@@ -72,8 +72,14 @@ public class PONot extends UnaryComparis
if(res.returnStatus != POStatus.STATUS_OK || res.result == null) {
return res;
}
- if (((Boolean)res.result).booleanValue()) return falseRes;
- else return trueRes;
+ if (((Boolean)res.result).booleanValue()) {
+ illustratorMarkup(null, falseRes.result, 1);
+ return falseRes;
+ }
+ else {
+ illustratorMarkup(null, trueRes.result, 0);
+ return trueRes;
+ }
}
@Override
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POOr.java Mon Dec 13 19:11:00 2010
@@ -78,9 +78,19 @@ public class POOr extends BinaryComparis
// 3) f t n f
// Short circuit. if lhs is true, return true - ROW 1 above is handled with this
- if (left.result != null && ((Boolean)left.result).booleanValue()) return left;
+ boolean returnLeft = false;
+ if (left.result != null && ((Boolean)left.result).booleanValue()) {
+ if (illustrator == null)
+ return left;
+
+ illustratorMarkup(null, left.result, 0);
+ returnLeft = true;;
+ }
Result right = rhs.getNext(dummyBool);
+ if (returnLeft)
+ return left;
+
// pass on ERROR and EOP
if(right.returnStatus != POStatus.STATUS_OK && right.returnStatus != POStatus.STATUS_NULL) {
return right;
@@ -94,6 +104,8 @@ public class POOr extends BinaryComparis
// No matter what, what we get from the right side is what we'll
// return, null, true, or false.
+ if (right.result != null)
+ illustratorMarkup(null, right.result, (Boolean) right.result ? 0 : 1);
return right;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POProject.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
@@ -38,6 +39,8 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
/**
* Implements the overloaded form of the project operator.
@@ -143,6 +146,7 @@ public class POProject extends Expressio
return res;
}
if (star) {
+ illustratorMarkup(inpValue, res.result, -1);
return res;
} else if(columns.size() == 1) {
try {
@@ -188,6 +192,7 @@ public class POProject extends Expressio
ret = tupleFactory.newTuple(objList);
}
res.result = ret;
+ illustratorMarkup(inpValue, res.result, -1);
return res;
}
@@ -494,4 +499,11 @@ public class POProject extends Expressio
return null;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/PORegexp.java Mon Dec 13 19:11:00 2010
@@ -86,6 +86,7 @@ public class PORegexp extends BinaryComp
} else {
left.result = falseRef;
}
+ illustratorMarkup(null, left.result, (Boolean) left.result ? 0 : 1);
return left;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserComparisonFunc.java Mon Dec 13 19:11:00 2010
@@ -38,6 +38,7 @@ import org.apache.pig.impl.PigContext;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
//We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
@@ -84,6 +85,9 @@ public class POUserComparisonFunc extend
// the two attached tuples are used up now. So we set the
// inputAttached flag to false
inputAttached = false;
+ if (result.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(null, result.result,
+ (Integer) result.result == 0 ? 0 : (Integer) result.result > 0 ? 1 : 2);
return result;
}
@@ -192,4 +196,14 @@ public class POUserComparisonFunc extend
return null;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ illustrator.getInputs().add(t1);
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(t1);
+ illustrator.getInputs().add(t2);
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(t2);
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/POUserFunc.java Mon Dec 13 19:11:00 2010
@@ -480,4 +480,13 @@ public class POUserFunc extends Expressi
public void setResultType(byte resultType) {
this.resultType = resultType;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return (Tuple) out;
+ }
+
+ public EvalFunc getFunc() {
+ return func;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/expressionOperators/UnaryComparisonOperator.java Mon Dec 13 19:11:00 2010
@@ -17,7 +17,9 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.OperatorKey;
+import org.apache.pig.impl.util.IdentityHashSet;
/**
* This is a base class for all unary comparison operators. Supports the
@@ -49,4 +51,12 @@ public abstract class UnaryComparisonOpe
public void setOperandType(byte operandType) {
this.operandType = operandType;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Mon Dec 13 19:11:00 2010
@@ -130,18 +130,6 @@ public class PhyPlanVisitor extends Plan
public void visitDistinct(PODistinct distinct) throws VisitorException {
//do nothing
}
-
- public void visitPenCross(org.apache.pig.pen.physicalOperators.POCross cross) throws VisitorException {
- //do nothing
- }
-
- public void visitPenCogroup(org.apache.pig.pen.physicalOperators.POCogroup cogroup) throws VisitorException {
- //do nothing
- }
-
- public void visitPenSplit(org.apache.pig.pen.physicalOperators.POSplit split) throws VisitorException {
- //do nothing
- }
public void visitRead(PORead read) throws VisitorException {
//do nothing
@@ -297,24 +285,9 @@ public class PhyPlanVisitor extends Plan
}
/**
- * @param lrfi
- * @throws VisitorException
- */
- public void visitLocalRearrangeForIllustrate(
- POLocalRearrangeForIllustrate lrfi) throws VisitorException {
- List<PhysicalPlan> inpPlans = lrfi.getPlans();
- for (PhysicalPlan plan : inpPlans) {
- pushWalker(mCurrentWalker.spawnChildWalker(plan));
- visit();
- popWalker();
- }
-
- }
-
- /**
* @param optimizedForEach
*/
- public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
+ public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
// TODO Auto-generated method stub
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POCollectedGroup.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
+import java.util.Iterator;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
@@ -37,6 +38,9 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
/**
* The collected group operator is a special operator used when users give
@@ -247,7 +251,7 @@ public class POCollectedGroup extends Ph
outputBag = useDefaultBag ? BagFactory.getInstance().newDefaultBag()
: new InternalCachedBag(1);
outputBag.add((Tuple)tup.get(1));
-
+ illustratorMarkup(null, tup2, 0);
return res;
}
@@ -298,4 +302,33 @@ public class POCollectedGroup extends Ph
leafOps.add(leaf);
}
}
+
+ private void setIllustratorEquivalenceClasses(Tuple tin) {
+ if (illustrator != null) {
+ illustrator.getEquivalenceClasses().get(0).add(tin);
+ }
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple(out);
+ LineageTracer lineage = illustrator.getLineage();
+ lineage.insert(tOut);
+ DataBag bag;
+ try {
+ bag = (DataBag) tOut.get(1);
+ } catch (ExecException e) {
+ throw new RuntimeException("Illustrator markup exception" + e.getMessage());
+ }
+ boolean synthetic = false;
+ while (!synthetic && bag.iterator().hasNext()) {
+ synthetic |= ((ExampleTuple) bag.iterator().next()).synthetic;
+ }
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) out);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Mon Dec 13 19:11:00 2010
@@ -341,5 +341,11 @@ public class PODemux extends PhysicalOpe
public boolean isInCombiner() {
return inCombiner;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // nothing need to be done here
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODistinct.java Mon Dec 13 19:11:00 2010
@@ -19,7 +19,9 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
+import java.util.HashSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -32,12 +34,17 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.data.BagFactory;
import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.DistinctDataBag;
import org.apache.pig.data.InternalDistinctBag;
import org.apache.pig.data.InternalSortedBag;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
/**
* Find the distinct set of tuples in a bag.
@@ -105,6 +112,7 @@ public class PODistinct extends Physical
continue;
}
distinctBag.add((Tuple) in.result);
+ illustratorMarkup(in.result, in.result, 0);
in = processInput();
}
inputsAccumulated = true;
@@ -159,4 +167,12 @@ public class PODistinct extends Physical
return new PODistinct(new OperatorKey(this.mKey.scope, NodeIdGenerator.getGenerator().getNextNodeId(this.mKey.scope)), this.requestedParallelism, this.inputs);
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) out);
+ illustrator.addData((Tuple) out);
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Mon Dec 13 19:11:00 2010
@@ -445,4 +445,10 @@ public class POFRJoin extends PhysicalOp
public void setReplFiles(FileSpec[] replFiles) {
this.replFiles = replFiles;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // no op: all handled by the preceding POForEach
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Mon Dec 13 19:11:00 2010
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.List;
+import java.util.LinkedList;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
@@ -25,12 +26,13 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
-import org.apache.pig.backend.hadoop.executionengine.physicalLayer.expressionOperators.ComparisonOperator;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/**
* This is an implementation of the Filter operator. It has an Expression Plan
@@ -151,13 +153,10 @@ public class POFilter extends PhysicalOp
&& res.returnStatus != POStatus.STATUS_NULL)
return res;
- if (res.result != null && (Boolean) res.result == true) {
- if(lineageTracer != null) {
- ExampleTuple tIn = (ExampleTuple) inp.result;
- lineageTracer.insert(tIn);
- lineageTracer.union(tIn, tIn);
- }
- return inp;
+ if (res.result != null) {
+ illustratorMarkup(inp.result, inp.result, (Boolean) res.result ? 0 : 1);
+ if ((Boolean) res.result)
+ return inp;
}
}
return inp;
@@ -194,4 +193,20 @@ public class POFilter extends PhysicalOp
public PhysicalPlan getPlan() {
return plan;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ int index = 0;
+ for (int i = 0; i < illustrator.getSubExpResults().size(); ++i) {
+ if (!illustrator.getSubExpResults().get(i)[0])
+ index += (1 << i);
+ }
+ if (index < illustrator.getEquivalenceClasses().size())
+ illustrator.getEquivalenceClasses().get(index).add((Tuple) in);
+ if (eqClassIndex == 0) // add only qualified record
+ illustrator.addData((Tuple) out);
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POForEach.java Mon Dec 13 19:11:00 2010
@@ -47,7 +47,9 @@ import org.apache.pig.impl.plan.Dependen
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
//We intentionally skip type checking in backend for performance reasons
@SuppressWarnings("unchecked")
@@ -93,6 +95,8 @@ public class POForEach extends PhysicalO
protected transient AccumulativeTupleBuffer buffer;
+ protected Tuple inpTuple;
+
public POForEach(OperatorKey k) {
this(k,-1,null,null);
}
@@ -209,13 +213,6 @@ public class POForEach extends PhysicalO
res = processPlan();
if(res.returnStatus==POStatus.STATUS_OK) {
- if(lineageTracer != null && res.result != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
- }
return res;
}
if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -247,18 +244,18 @@ public class POForEach extends PhysicalO
}
attachInputToPlans((Tuple) inp.result);
- Tuple tuple = (Tuple)inp.result;
+ inpTuple = (Tuple)inp.result;
for (PhysicalOperator po : opsToBeReset) {
po.reset();
}
if (isAccumulative()) {
- for(int i=0; i<tuple.size(); i++) {
- if (tuple.getType(i) == DataType.BAG) {
+ for(int i=0; i<inpTuple.size(); i++) {
+ if (inpTuple.getType(i) == DataType.BAG) {
// we only need to check one bag, because all the bags
// share the same buffer
- buffer = ((AccumulativeBag)tuple.get(i)).getTuplebuffer();
+ buffer = ((AccumulativeBag)inpTuple.get(i)).getTuplebuffer();
break;
}
}
@@ -272,8 +269,9 @@ public class POForEach extends PhysicalO
}
setAccumStart();
- }else{
- buffer.clear();
+ }else{
+ inpTuple = ((POPackage.POPackageTupleBuffer) buffer).illustratorMarkup(null, inpTuple, 0);
+ // buffer.clear();
setAccumEnd();
}
@@ -292,16 +290,6 @@ public class POForEach extends PhysicalO
}
processingPlan = true;
-
- if(lineageTracer != null && res.result != null) {
- //we check for res.result since that can also be null in the case of flatten
- tIn = (ExampleTuple) inp.result;
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
- }
return res;
}
@@ -482,12 +470,10 @@ public class POForEach extends PhysicalO
} else
out.append(in);
}
-
- if(lineageTracer != null) {
- ExampleTuple tOut = new ExampleTuple();
- tOut.reference(out);
- }
- return out;
+ if (inpTuple != null)
+ return illustratorMarkup(inpTuple, out, 0);
+ else
+ return illustratorMarkup2(data, out);
}
@@ -699,4 +685,43 @@ public class POForEach extends PhysicalO
public void setOpsToBeReset(List<PhysicalOperator> opsToBeReset) {
this.opsToBeReset = opsToBeReset;
}
+
+ private Tuple illustratorMarkup2(Object[] in, Object out) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.getLineage().insert(tOut);
+ boolean synthetic = false;
+ for (Object tIn : in)
+ {
+ synthetic |= ((ExampleTuple) tIn).synthetic;
+ illustrator.getLineage().union(tOut, (Tuple) tIn);
+ }
+ illustrator.addData(tOut);
+ int i;
+ for (i = 0; i < noItems; ++i)
+ if (((DataBag)bags[i]).size() < 2)
+ break;
+ if (i >= noItems && !illustrator.getEqClassesShared())
+ illustrator.getEquivalenceClasses().get(0).add(tOut);
+ tOut.synthetic = synthetic;
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.addData(tOut);
+ if (!illustrator.getEqClassesShared())
+ illustrator.getEquivalenceClasses().get(0).add(tOut);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ lineageTracer.union((ExampleTuple) in , tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POGlobalRearrange.java Mon Dec 13 19:11:00 2010
@@ -105,4 +105,9 @@ public class POGlobalRearrange extends P
// TODO Auto-generated method stub
return super.getNext(t);
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLimit.java Mon Dec 13 19:11:00 2010
@@ -17,6 +17,7 @@
*/
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -32,6 +33,9 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
public class POLimit extends PhysicalOperator {
/**
@@ -87,7 +91,9 @@ public class POLimit extends PhysicalOpe
|| inp.returnStatus == POStatus.STATUS_ERR)
break;
- if (soFar>=mLimit)
+ illustratorMarkup(inp.result, null, 0);
+ // illustrator ignore LIMIT before the post processing
+ if ((illustrator == null || illustrator.getOriginalLimit() != -1) && soFar>=mLimit)
inp.returnStatus = POStatus.STATUS_EOP;
soFar++;
@@ -131,4 +137,14 @@ public class POLimit extends PhysicalOpe
newLimit.setAlias(alias);
return newLimit;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tIn = (ExampleTuple) in;
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+ illustrator.addData((Tuple) in);
+ }
+ return (Tuple) in;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLoad.java Mon Dec 13 19:11:00 2010
@@ -135,10 +135,9 @@ public class POLoad extends PhysicalOper
}
else
res.returnStatus = POStatus.STATUS_OK;
- if(lineageTracer != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- res.result = tOut;
- }
+
+ if (res.returnStatus == POStatus.STATUS_OK)
+ res.result = illustratorMarkup(res, res.result, 0);
} catch (IOException e) {
log.error("Received error from loader function: " + e);
return res;
@@ -199,4 +198,37 @@ public class POLoad extends PhysicalOper
public LoadFunc getLoadFunc(){
return this.loader;
}
+
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ if (!illustrator.ceilingCheck()) {
+ ((Result) in).returnStatus = POStatus.STATUS_EOP;
+ return null;
+ }
+ if (illustrator.getSchema() == null || illustrator.getSchema().size() <= ((Tuple) out).size()) {
+ boolean hasNull = false;
+ for (int i = 0; i < ((Tuple) out).size(); ++i)
+ try {
+ if (((Tuple) out).get(i) == null)
+ {
+ hasNull = true;
+ break;
+ }
+ } catch (ExecException e) {
+ hasNull = true;
+ break;
+ }
+ if (!hasNull) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.getLineage().insert(tOut);
+ illustrator.addData((Tuple) tOut);
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ } else
+ return (Tuple) out;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Mon Dec 13 19:11:00 2010
@@ -41,6 +41,7 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.plan.PlanException;
+import org.apache.pig.pen.util.ExampleTuple;
/**
* The local rearrange operator is a part of the co-group
@@ -380,6 +381,7 @@ public class POLocalRearrange extends Ph
if(secondaryPlans != null)
detachPlans(secondaryPlans);
+ res.result = illustratorMarkup(inp.result, res.result, 0);
return res;
}
return inp;
@@ -445,7 +447,10 @@ public class POLocalRearrange extends Ph
//Put the key and the indexed tuple
//in a tuple and return
lrOutput.set(1, key);
- lrOutput.set(2, mFakeTuple);
+ if (illustrator != null)
+ lrOutput.set(2, key);
+ else
+ lrOutput.set(2, mFakeTuple);
return lrOutput;
} else if(isCross){
@@ -486,6 +491,7 @@ public class POLocalRearrange extends Ph
minimalValue.append(value.get(i));
}
}
+ minimalValue = illustratorMarkup(value, minimalValue, -1);
} else {
// for the project star case
// we would send out an empty tuple as
@@ -799,4 +805,19 @@ public class POLocalRearrange extends Ph
this.stripKeyFromValue = stripKeyFromValue;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if (illustrator != null) {
+ if (!(out instanceof ExampleTuple))
+ {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.getLineage().insert(tOut);
+ illustrator.addData(tOut);
+ illustrator.getLineage().union(tOut, (Tuple) in);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ return tOut;
+ }
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,8 @@ import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.PriorityQueue;
import java.util.Properties;
@@ -53,7 +55,10 @@ import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
public class POMergeCogroup extends PhysicalOperator {
@@ -279,7 +284,7 @@ public class POMergeCogroup extends Phys
for(int i=0; i < relationCnt; i++)
out.set(i+1,(outBags[i]));
- return new Result(POStatus.STATUS_OK, out);
+ return new Result(POStatus.STATUS_OK, illustratorMarkup(null, out, -1));
}
@@ -592,4 +597,38 @@ public class POMergeCogroup extends Phys
System.out.println(i+++"th item in heap: "+ copy.poll());
}
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert((Tuple) out);
+ Tuple tmp;
+ boolean synthetic = false;
+ try {
+ for (int i = 1; i < relationCnt; i++)
+ {
+ DataBag dbs = (DataBag) ((Tuple) out).get(i);
+ Iterator<Tuple> iter = dbs.iterator();
+ while (iter.hasNext()) {
+ tmp = iter.next();
+ // any of synthetic data in bags causes the output tuple to be synthetic
+ if (!synthetic && ((ExampleTuple)tmp).synthetic)
+ synthetic = true;
+ lineageTracer.union(tOut, tmp);
+ // TODO constraint of >=2 tuples per eq. class
+ illustrator.getEquivalenceClasses().get(i-1).add(tmp);
+ }
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"+e.getMessage());
+ }
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,8 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.io.ObjectInputStream;
import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -37,6 +39,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
+import org.apache.pig.data.DataBag;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.data.TupleFactory;
@@ -47,7 +50,10 @@ import org.apache.pig.impl.plan.NodeIdGe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.PlanException;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.MultiMap;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/** This operator implements merge join algorithm to do map side joins.
* Currently, only two-way joins are supported. One input of join is identified as left
@@ -204,6 +210,7 @@ public class POMergeJoin extends Physica
for(int i=0; i < rightTupSize; i++)
joinedTup.set(i+leftTupSize, curJoiningRightTup.get(i));
+ joinedTup = illustratorMarkup(null, joinedTup, 0);
return new Result(POStatus.STATUS_OK, joinedTup);
}
// Join with current right input has ended. But bag of left tuples
@@ -555,4 +562,28 @@ public class POMergeJoin extends Physica
public String getIndexFile() {
return indexFile;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ tOut.synthetic = ((ExampleTuple) out).synthetic;
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ try {
+ for (int i = 0; i < leftTupSize+rightTupSize; i++)
+ {
+ lineageTracer.union(tOut, (Tuple) tOut.get(i));
+ illustrator.getEquivalenceClasses().get(i).add((Tuple)tOut);
+ // TODO constraint of >=2 tuples per eq. class
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"+e.getMessage());
+ }
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Mon Dec 13 19:11:00 2010
@@ -270,7 +270,7 @@ public class POMultiQueryPackage extends
myObj.setIndex(origIndex);
tuple.set(0, myObj);
}
-
+ // illustrator markup has been handled by "pack"
return res;
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PONative.java Mon Dec 13 19:11:00 2010
@@ -22,6 +22,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.util.Utils;
+import org.apache.pig.data.Tuple;
public class PONative extends PhysicalOperator {
@@ -72,4 +73,8 @@ public class PONative extends PhysicalOp
return false;
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.List;
+import java.util.LinkedList;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.DataType;
@@ -31,6 +32,8 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+import org.apache.pig.impl.util.IdentityHashSet;
/**
* A specialized version of POForeach with the difference
@@ -97,13 +100,6 @@ public class POOptimizedForEach extends
while(true) {
res = processPlan();
if(res.returnStatus==POStatus.STATUS_OK) {
- if(lineageTracer != null && res.result != null) {
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
- }
return res;
}
if(res.returnStatus==POStatus.STATUS_EOP) {
@@ -132,16 +128,6 @@ public class POOptimizedForEach extends
res = processPlan();
processingPlan = true;
-
- if(lineageTracer != null && res.result != null) {
- //we check for res.result since that can also be null in the case of flatten
- tIn = (ExampleTuple) input;
- ExampleTuple tOut = new ExampleTuple((Tuple) res.result);
- tOut.synthetic = tIn.synthetic;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tIn);
- res.result = tOut;
- }
return res;
}
@@ -171,6 +157,4 @@ public class POOptimizedForEach extends
NodeIdGenerator.getGenerator().getNextNodeId(mKey.scope)),
requestedParallelism, plans, flattens);
}
-
-
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackage.java Mon Dec 13 19:11:00 2010
@@ -21,11 +21,10 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.data.AccumulativeBag;
import org.apache.pig.data.BagFactory;
@@ -44,7 +43,11 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
+
/**
* The package operator that packages
* the globally rearranged tuples into
@@ -112,8 +115,6 @@ public class POPackage extends PhysicalO
// "value" to column numbers in the "key" which contain the fields in
// the "value"
protected Map<Integer, Pair<Boolean, Map<Integer, Integer>>> keyInfo;
-
- transient private final Log log = LogFactory.getLog(getClass());
protected static final BagFactory mBagFactory = BagFactory.getInstance();
protected static final TupleFactory mTupleFactory = TupleFactory.getInstance();
@@ -304,10 +305,13 @@ public class POPackage extends PhysicalO
res.set(i+1,bag);
}
}
- detachInput();
Result r = new Result();
- r.result = res;
r.returnStatus = POStatus.STATUS_OK;
+ if (!isAccumulative())
+ r.result = illustratorMarkup(null, res, 0);
+ else
+ r.result = res;
+ detachInput();
return r;
}
@@ -355,19 +359,19 @@ public class POPackage extends PhysicalO
}
}
}
-
+ copy = illustratorMarkup2(val, copy);
} else if (isProjectStar) {
// the whole "value" is present in the "key"
copy = mTupleFactory.newTuple(keyAsTuple.getAll());
-
+ copy = illustratorMarkup2(keyAsTuple, copy);
} else {
// there is no field of the "value" in the
// "key" - so just make a copy of what we got
// as the "value"
copy = mTupleFactory.newTuple(val.getAll());
-
+ copy = illustratorMarkup2(val, copy);
}
return copy;
}
@@ -462,7 +466,7 @@ public class POPackage extends PhysicalO
return pkgType;
}
- private class POPackageTupleBuffer implements AccumulativeTupleBuffer {
+ class POPackageTupleBuffer implements AccumulativeTupleBuffer {
private List<Tuple>[] bags;
private Iterator<NullableTuple> iter;
private int batchSize;
@@ -529,5 +533,79 @@ public class POPackage extends PhysicalO
public Iterator<Tuple> getTuples(int index) {
return bags[index].iterator();
}
+
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return POPackage.this.illustratorMarkup(in, out, eqClassIndex);
+ }
};
+
+ private Tuple illustratorMarkup2(Object in, Object out) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ illustrator.getLineage().insert(tOut);
+ tOut.synthetic = ((ExampleTuple) in).synthetic;
+ illustrator.getLineage().union(tOut, (Tuple) in);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ Tuple tmp;
+ boolean synthetic = false;
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, this);
+ }
+
+ if (distinct) {
+ int count;
+ for (count = 0; tupIter.hasNext(); ++count) {
+ NullableTuple ntp = tupIter.next();
+ tmp = (Tuple) ntp.getValueAsPigType();
+ if (!tmp.equals(tOut))
+ lineageTracer.union(tOut, tmp);
+ }
+ if (count > 1) // only non-distinct tuples are inserted into the equivalence class
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ illustrator.addData((Tuple) tOut);
+ return (Tuple) tOut;
+ }
+ boolean outInEqClass = true;
+ try {
+ for (int i = 1; i < numInputs+1; i++)
+ {
+ DataBag dbs = (DataBag) ((Tuple) out).get(i);
+ Iterator<Tuple> iter = dbs.iterator();
+ if (dbs.size() <= 1 && outInEqClass) // all inputs have >= 2 records
+ outInEqClass = false;
+ while (iter.hasNext()) {
+ tmp = iter.next();
+ // any of synthetic data in bags causes the output tuple to be synthetic
+ if (!synthetic && ((ExampleTuple)tmp).synthetic)
+ synthetic = true;
+ lineageTracer.union(tOut, tmp);
+ }
+ }
+ } catch (ExecException e) {
+ // TODO better exception handling
+ throw new RuntimeException("Illustrator exception :"+e.getMessage());
+ }
+ if (outInEqClass)
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = synthetic;
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPackageLite.java Mon Dec 13 19:11:00 2010
@@ -22,10 +22,13 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.HashMap;
import java.util.Map;
import java.util.Map.Entry;
+
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.impl.util.Pair;
import org.apache.pig.PigException;
@@ -42,6 +45,8 @@ import org.apache.pig.impl.io.NullableTu
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/**
* This package operator is a specialization
@@ -173,8 +178,8 @@ public class POPackageLite extends POPac
res.set(1,db);
detachInput();
Result r = new Result();
- r.result = res;
r.returnStatus = POStatus.STATUS_OK;
+ r.result = illustratorMarkup(null, res, 0);
return r;
}
@@ -202,5 +207,27 @@ public class POPackageLite extends POPac
+ DataType.findTypeName(keyType) + "}" + " - "
+ mKey.toString();
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tOut = new ExampleTuple((Tuple) out);
+ LineageTracer lineageTracer = illustrator.getLineage();
+ lineageTracer.insert(tOut);
+ if (illustrator.getEquivalenceClasses() == null) {
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < numInputs; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, this);
+ }
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tOut);
+ tOut.synthetic = false; // not expect this to be really used
+ illustrator.addData((Tuple) tOut);
+ return tOut;
+ } else
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POPreCombinerLocalRearrange.java Mon Dec 13 19:11:00 2010
@@ -234,4 +234,8 @@ public class POPreCombinerLocalRearrange
}
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PORead.java Mon Dec 13 19:11:00 2010
@@ -103,4 +103,8 @@ public class PORead extends PhysicalOper
v.visitRead(this);
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSkewedJoin.java Mon Dec 13 19:11:00 2010
@@ -27,6 +27,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhysicalPlan;
import org.apache.pig.data.DataType;
+import org.apache.pig.data.Tuple;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
@@ -121,4 +122,8 @@ public class POSkewedJoin extends Physic
return inputSchema.get(i);
}
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSort.java Mon Dec 13 19:11:00 2010
@@ -21,6 +21,7 @@ import java.io.Serializable;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -47,6 +48,8 @@ import org.apache.pig.data.TupleFactory;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.NodeIdGenerator;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.LineageTracer;
/**
* This implementation is applicable for both the physical plan and for the
@@ -301,10 +304,7 @@ public class POSort extends PhysicalOper
}
if (it.hasNext()) {
res.result = it.next();
- if(lineageTracer != null) {
- lineageTracer.insert((Tuple) res.result);
- lineageTracer.union((Tuple)res.result, (Tuple)res.result);
- }
+ illustratorMarkup(res.result, res.result, 0);
res.returnStatus = POStatus.STATUS_OK;
} else {
res.returnStatus = POStatus.STATUS_EOP;
@@ -407,4 +407,12 @@ public class POSort extends PhysicalOper
public SortInfo getSortInfo() {
return sortInfo;
}
+
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add((Tuple) in);
+ illustrator.addData((Tuple) out);
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POSplit.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.util.ArrayList;
import java.util.BitSet;
+import java.util.LinkedList;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -33,6 +34,9 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/**
* The MapReduce Split operator.
@@ -314,5 +318,11 @@ public class POSplit extends PhysicalOpe
return res;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ // no op
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStore.java Mon Dec 13 19:11:00 2010
@@ -19,6 +19,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.List;
+import java.util.LinkedList;
import org.apache.hadoop.mapreduce.Counter;
import org.apache.pig.PigException;
@@ -36,6 +37,9 @@ import org.apache.pig.impl.io.FileSpec;
import org.apache.pig.impl.logicalLayer.schema.Schema;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
/**
* The store operator which is used in two ways:
@@ -132,8 +136,12 @@ public class POStore extends PhysicalOpe
try {
switch (res.returnStatus) {
case POStatus.STATUS_OK:
- storer.putNext((Tuple)res.result);
+ if (illustrator == null) {
+ storer.putNext((Tuple)res.result);
+ } else
+ illustratorMarkup(res.result, res.result, 0);
res = empty;
+
if (outputRecordCounter != null) {
outputRecordCounter.increment(1);
}
@@ -250,4 +258,17 @@ public class POStore extends PhysicalOpe
public boolean isMultiStore() {
return isMultiStore;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tIn = (ExampleTuple) in;
+ LineageTracer lineage = illustrator.getLineage();
+ lineage.insert(tIn);
+ if (!isTmpStore)
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+ illustrator.addData((Tuple) out);
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POStream.java Mon Dec 13 19:11:00 2010
@@ -20,6 +20,7 @@ package org.apache.pig.backend.hadoop.ex
import java.io.IOException;
import java.util.Iterator;
+import java.util.LinkedList;
import java.util.List;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
@@ -31,6 +32,9 @@ import org.apache.pig.impl.plan.Operator
import org.apache.pig.impl.plan.VisitorException;
import org.apache.pig.impl.streaming.ExecutableManager;
import org.apache.pig.impl.streaming.StreamingCommand;
+import org.apache.pig.impl.util.IdentityHashSet;
+import org.apache.pig.pen.util.ExampleTuple;
+import org.apache.pig.pen.util.LineageTracer;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -158,7 +162,8 @@ public class POStream extends PhysicalOp
// we don't need to set any flag noting we saw all output
// from binary
r = EOP_RESULT;
- }
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result, 0);
return(r);
}
@@ -207,7 +212,8 @@ public class POStream extends PhysicalOp
// should never be called. So we don't need to set any
// flag noting we saw all output from binary
r = EOP_RESULT;
- }
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result, 0);
return r;
} else {
// we are not being called from close() - so
@@ -222,7 +228,8 @@ public class POStream extends PhysicalOp
// for future calls
r = EOP_RESULT;
allOutputFromBinaryProcessed = true;
- }
+ } else if (r.returnStatus == POStatus.STATUS_OK)
+ illustratorMarkup(r.result, r.result, 0);
return r;
}
@@ -348,4 +355,14 @@ public class POStream extends PhysicalOp
public BlockingQueue<Result> getBinaryOutputQueue() {
return binaryOutputQueue;
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ ExampleTuple tIn = (ExampleTuple) in;
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+ illustrator.addData((Tuple) out);
+ }
+ return (Tuple) out;
+ }
}
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POUnion.java Mon Dec 13 19:11:00 2010
@@ -18,6 +18,7 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.BitSet;
+import java.util.LinkedList;
import java.util.List;
import org.apache.pig.backend.executionengine.ExecException;
@@ -29,6 +30,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.Result;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.IdentityHashSet;
import org.apache.pig.pen.util.ExampleTuple;
/**
@@ -165,6 +167,7 @@ public class POUnion extends PhysicalOpe
if(res.returnStatus == POStatus.STATUS_OK ||
res.returnStatus == POStatus.STATUS_NULL || res.returnStatus == POStatus.STATUS_ERR) {
+ illustratorMarkup(res.result, res.result, ind);
return res;
}
@@ -181,14 +184,29 @@ public class POUnion extends PhysicalOpe
res.returnStatus = POStatus.STATUS_OK;
detachInput();
nextReturnEOP = true ;
- if(lineageTracer != null) {
- ExampleTuple tOut = (ExampleTuple) res.result;
- lineageTracer.insert(tOut);
- lineageTracer.union(tOut, tOut);
- }
+ illustratorMarkup(res.result, res.result, 0);
return res;
}
}
+
+ @Override
+ public Tuple illustratorMarkup(Object in, Object out, int eqClassIndex) {
+ if(illustrator != null) {
+ if (illustrator.getEquivalenceClasses() == null) {
+ int size = (inputs == null ? 1 : inputs.size());
+ LinkedList<IdentityHashSet<Tuple>> equivalenceClasses = new LinkedList<IdentityHashSet<Tuple>>();
+ for (int i = 0; i < size; ++i) {
+ IdentityHashSet<Tuple> equivalenceClass = new IdentityHashSet<Tuple>();
+ equivalenceClasses.add(equivalenceClass);
+ }
+ illustrator.setEquivalenceClasses(equivalenceClasses, this);
+ }
+ ExampleTuple tIn = (ExampleTuple) in;
+ illustrator.getEquivalenceClasses().get(eqClassIndex).add(tIn);
+ illustrator.addData((Tuple) out);
+ }
+ return null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/data/AccumulativeBag.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/AccumulativeBag.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/AccumulativeBag.java (original)
+++ pig/trunk/src/org/apache/pig/data/AccumulativeBag.java Mon Dec 13 19:11:00 2010
@@ -68,7 +68,9 @@ public class AccumulativeBag implements
}
public long size() {
- throw new RuntimeException("AccumulativeBag does not support size() operation");
+ int size = 0;
+ for (Iterator<Tuple> it = iterator(); it.hasNext(); it.next(), ++size);
+ return size;
}
public long getMemorySize() {
Modified: pig/trunk/src/org/apache/pig/data/TupleFactory.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/data/TupleFactory.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/data/TupleFactory.java (original)
+++ pig/trunk/src/org/apache/pig/data/TupleFactory.java Mon Dec 13 19:11:00 2010
@@ -75,6 +75,24 @@ public abstract class TupleFactory {
throw new RuntimeException("Unable to instantiate "
+ "tuple factory " + factoryName, e);
}
+ } else if (factoryName != null) {
+ try {
+ Class c = Class.forName(factoryName);
+ Object o = c.newInstance();
+ if (!(o instanceof TupleFactory)) {
+ throw new RuntimeException("Provided factory " +
+ factoryName + " does not extend TupleFactory!");
+ }
+ gSelf = (TupleFactory)o;
+ } catch (Exception e) {
+ if (e instanceof RuntimeException) {
+ // We just threw this
+ RuntimeException re = (RuntimeException)e;
+ throw re;
+ }
+ throw new RuntimeException("Unable to instantiate "
+ + "tuple factory " + factoryName, e);
+ }
} else {
gSelf = new BinSedesTupleFactory();
}
Modified: pig/trunk/src/org/apache/pig/impl/PigContext.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/PigContext.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/PigContext.java (original)
+++ pig/trunk/src/org/apache/pig/impl/PigContext.java Mon Dec 13 19:11:00 2010
@@ -115,11 +115,14 @@ public class PigContext implements Seria
public int defaultParallel = -1;
- // Says, wether we're processing an explain right now. Explain
+ // Says, whether we're processing an explain right now. Explain
// might skip some check in the logical plan validation (file
// existence checks, etc).
public boolean inExplain = false;
+ // whether we're processing an ILLUSTRATE right now.
+ public boolean inIllustrator = false;
+
private String last_alias = null;
// List of paths skipped for automatic shipping
Modified: pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java (original)
+++ pig/trunk/src/org/apache/pig/impl/builtin/ReadScalars.java Mon Dec 13 19:11:00 2010
@@ -18,16 +18,17 @@
package org.apache.pig.impl.builtin;
import java.io.IOException;
+import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.pig.EvalFunc;
import org.apache.pig.backend.executionengine.ExecException;
-import org.apache.pig.backend.hadoop.executionengine.mapReduceLayer.PigMapReduce;
import org.apache.pig.data.DataType;
import org.apache.pig.data.Tuple;
import org.apache.pig.impl.io.InterStorage;
import org.apache.pig.impl.io.ReadToEndLoader;
import org.apache.pig.impl.util.UDFContext;
+import org.apache.pig.data.DataBag;
/**
* ReadScalars reads a line from a file and returns it as its value. The
@@ -39,6 +40,9 @@ public class ReadScalars extends EvalFun
private String scalarfilename = null;
// private String charset = "UTF-8";
private Object value = null;
+
+ // in-core input : used by illustrator
+ private Map<String, DataBag> inputBuffer = null;
/**
* Java level API
@@ -54,6 +58,24 @@ public class ReadScalars extends EvalFun
return null;
int pos;
+ if (inputBuffer != null)
+ {
+ pos = DataType.toInteger(input.get(0));
+ scalarfilename = DataType.toString(input.get(1));
+ DataBag inputBag = inputBuffer.get(scalarfilename);
+ if (inputBag == null || inputBag.size() ==0)
+ {
+ log.warn("No scalar field to read, returning null");
+ return null;
+ } else if (inputBag.size() > 1) {
+ String msg = "Scalar has more than one row in the output.";
+ throw new ExecException(msg);
+ }
+ Tuple t1 = inputBag.iterator().next();
+ value = t1.get(pos);
+ return value;
+ }
+
ReadToEndLoader loader;
try {
pos = DataType.toInteger(input.get(0));
@@ -92,4 +114,8 @@ public class ReadScalars extends EvalFun
return value;
}
+ public void setOutputBuffer(Map<String, DataBag> inputBuffer) {
+ this.inputBuffer = inputBuffer;
+ value = null;
+ }
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/ForeachInnerPlanVisitor.java Mon Dec 13 19:11:00 2010
@@ -105,6 +105,10 @@ public class ForeachInnerPlanVisitor ext
return childPlanVisitor.exprPlan;
}
+ public Map<LogicalOperator, LogicalRelationalOperator> getInnerOpMap() {
+ return innerOpsMap;
+ }
+
public void visit(LOProject project) throws VisitorException {
LogicalOperator op = project.getExpression();
Modified: pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/LogicalPlanMigrationVistor.java Mon Dec 13 19:11:00 2010
@@ -60,13 +60,23 @@ import org.apache.pig.newplan.logical.re
public class LogicalPlanMigrationVistor extends LOVisitor {
private org.apache.pig.newplan.logical.relational.LogicalPlan logicalPlan;
private Map<LogicalOperator, LogicalRelationalOperator> opsMap;
+ private Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> forEachInnerMap;
public LogicalPlanMigrationVistor(LogicalPlan plan) {
super(plan, new DependencyOrderWalker<LogicalOperator, LogicalPlan>(plan));
logicalPlan = new org.apache.pig.newplan.logical.relational.LogicalPlan();
opsMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+ forEachInnerMap = new HashMap<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>>();
}
+ public Map<LogicalOperator, LogicalRelationalOperator> getOldToNewLOOpMap() {
+ return opsMap;
+ }
+
+ public Map<LOForEach, Map<LogicalOperator, LogicalRelationalOperator>> getForEachInnerMap() {
+ return forEachInnerMap;
+ }
+
private void translateConnection(LogicalOperator oldOp, org.apache.pig.newplan.Operator newOp) {
List<LogicalOperator> preds = mPlan.getPredecessors(oldOp);
@@ -227,12 +237,14 @@ public class LogicalPlanMigrationVistor
innerPlan.add(gen);
List<LogicalPlan> ll = forEach.getForEachPlans();
+ Map<LogicalOperator, LogicalRelationalOperator> innerMap = new HashMap<LogicalOperator, LogicalRelationalOperator>();
+ forEachInnerMap.put(forEach, innerMap);
try {
for(int i=0; i<ll.size(); i++) {
LogicalPlan lp = ll.get(i);
ForeachInnerPlanVisitor v = new ForeachInnerPlanVisitor(newForeach, forEach, lp, mPlan, opsMap);
v.visit();
-
+ innerMap.putAll(v.getInnerOpMap());
expPlans.add(v.exprPlan);
}
} catch (FrontendException e) {
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOCogroup.java Mon Dec 13 19:11:00 2010
@@ -267,6 +267,10 @@ public class LOCogroup extends LogicalRe
return mGroupType;
}
+ public void resetGroupType() {
+ mGroupType = GROUPTYPE.REGULAR;
+ }
+
/**
* Returns an Unmodifiable Map of Input Number to Uid
* @return Unmodifiable Map<Integer,Long>
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LOJoin.java Mon Dec 13 19:11:00 2010
@@ -82,6 +82,10 @@ public class LOJoin extends LogicalRelat
return mJoinType;
}
+ public void resetJoinType() {
+ mJoinType = JOINTYPE.HASH;
+ }
+
public Collection<LogicalExpressionPlan> getJoinPlan(int inputIndex) {
return mJoinPlans.get(inputIndex);
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/relational/LogToPhyTranslationVisitor.java Mon Dec 13 19:11:00 2010
@@ -105,6 +105,10 @@ public class LogToPhyTranslationVisitor
this.pc = pc;
}
+ public Map<Operator, PhysicalOperator> getLogToPhyMap() {
+ return logToPhyMap;
+ }
+
public PhysicalPlan getPhysicalPlan() {
return currentPlan;
}
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/MergeForEach.java Mon Dec 13 19:11:00 2010
@@ -28,7 +28,6 @@ import java.util.Set;
import org.apache.pig.newplan.Operator;
import org.apache.pig.newplan.OperatorPlan;
import org.apache.pig.newplan.OperatorSubPlan;
-import org.apache.pig.newplan.logical.expression.LogicalExpression;
import org.apache.pig.newplan.logical.expression.LogicalExpressionPlan;
import org.apache.pig.newplan.logical.expression.ProjectExpression;
import org.apache.pig.newplan.logical.relational.LOForEach;
Modified: pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java?rev=1045314&r1=1045313&r2=1045314&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java (original)
+++ pig/trunk/src/org/apache/pig/newplan/logical/rules/TypeCastInserter.java Mon Dec 13 19:11:00 2010
@@ -119,6 +119,8 @@ public abstract class TypeCastInserter e
foreach.setAlias(op.getAlias());
// Insert the foreach into the plan and patch up the plan.
+ if (currentPlan.getSuccessors(op) == null)
+ return;
Operator next = currentPlan.getSuccessors(op).get(0);
currentPlan.insertBetween(op, foreach, next);