You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by th...@apache.org on 2010/08/14 00:45:47 UTC
svn commit: r985392 - in /hadoop/pig/trunk: ./
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/
Author: thejas
Date: Fri Aug 13 22:45:46 2010
New Revision: 985392
URL: http://svn.apache.org/viewvc?rev=985392&view=rev
Log:
PIG-1448: Detach tuple from inner plans of physical operator (thejas)
Modified:
hadoop/pig/trunk/CHANGES.txt
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
Modified: hadoop/pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/CHANGES.txt?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/CHANGES.txt (original)
+++ hadoop/pig/trunk/CHANGES.txt Fri Aug 13 22:45:46 2010
@@ -26,6 +26,8 @@ PIG-1249: Safe-guards against misconfigu
IMPROVEMENTS
+PIG-1448: Detach tuple from inner plans of physical operator (thejas)
+
PIG-965: PERFORMANCE: optimize common case in matches (PORegex) (ankit.modi
via olgan)
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/PODemux.java Fri Aug 13 22:45:46 2010
@@ -84,6 +84,12 @@ public class PODemux extends PhysicalOpe
* The leaf of the current pipeline
*/
private PhysicalOperator curLeaf = null;
+
+
+ /**
+ * The current pipeline plan
+ */
+ private PhysicalPlan curPlan = null;
/*
* Indicating if this operator is in a combiner.
@@ -192,7 +198,8 @@ public class PODemux extends PhysicalOpe
} else {
if (getNext) {
-
+ if(curPlan != null)
+ curPlan.detachInput();
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_EOP) {
@@ -238,6 +245,8 @@ public class PODemux extends PhysicalOpe
if (processedSet.cardinality() == myPlans.size()) {
curLeaf = null;
+ if(curPlan != null)
+ curPlan.detachInput();
Result inp = processInput();
if (inp.returnStatus == POStatus.STATUS_OK) {
attachInputWithIndex((Tuple)inp.result);
@@ -296,22 +305,21 @@ public class PODemux extends PhysicalOpe
// is expected by the inner plans, as well as the index of the associated
// inner plan.
PigNullableWritable fld = (PigNullableWritable)res.get(0);
-
// choose an inner plan to run based on the index set by
// the POLocalRearrange operator and passed to this operator
// by POMultiQueryPackage
int index = fld.getIndex();
index &= idxPart;
-
- PhysicalPlan pl = myPlans.get(index);
- if (!(pl.getRoots().get(0) instanceof PODemux)) {
+
+ curPlan = myPlans.get(index);
+ if (!(curPlan.getRoots().get(0) instanceof PODemux)) {
res.set(0, fld.getValueAsPigType());
}
-
- myPlans.get(index).attachInput(res);
- return myPlans.get(index).getLeaves().get(0);
+
+ curPlan.attachInput(res);
+ return curPlan.getLeaves().get(0);
}
-
+
/**
* Sets a flag indicating if this operator is
* in a combiner.
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFRJoin.java Fri Aug 13 22:45:46 2010
@@ -259,7 +259,7 @@ public class POFRJoin extends PhysicalOp
Tuple key = TupleFactory.getInstance().newTuple(1);
key.set(0, lrOutTuple.get(1));
Tuple value = getValueTuple(lr, lrOutTuple);
-
+ lr.detachInput();
// Configure the for each operator with the relevant bags
int i = -1;
boolean noMatch = false;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POFilter.java Fri Aug 13 22:45:46 2010
@@ -146,6 +146,7 @@ public class POFilter extends PhysicalOp
}
*/
res = comOp.getNext(dummyBool);
+ plan.detachInput();
if (res.returnStatus != POStatus.STATUS_OK
&& res.returnStatus != POStatus.STATUS_NULL)
return res;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POLocalRearrange.java Fri Aug 13 22:45:46 2010
@@ -118,8 +118,7 @@ public class POLocalRearrange extends Ph
private int mProjectedColsMapSize = 0;
private int mSecondaryProjectedColsMapSize = 0;
- private Tuple lrOutput;
-
+
private boolean useSecondaryKey = false;
// By default, we strip keys from the value.
@@ -144,7 +143,6 @@ public class POLocalRearrange extends Ph
secondaryLeafOps = new ArrayList<ExpressionOperator>();
mProjectedColsMap = new HashMap<Integer, Integer>();
mSecondaryProjectedColsMap = new HashMap<Integer, Integer>();
- lrOutput = mTupleFactory.newTuple(3);
}
@Override
@@ -222,7 +220,6 @@ public class POLocalRearrange extends Ph
// indices and hence would go to different invocation of reduce()
this.index = multiQuery ? (byte)(index | PigNullableWritable.mqFlag) : (byte)index;
}
- lrOutput.set(0, Byte.valueOf(this.index));
}
public boolean isDistinct() {
@@ -378,11 +375,22 @@ public class POLocalRearrange extends Ph
res.result = constructLROutput(resLst,secondaryResLst,(Tuple)inp.result);
res.returnStatus = POStatus.STATUS_OK;
+ detachPlans(plans);
+
+ if(secondaryPlans != null)
+ detachPlans(secondaryPlans);
+
return res;
}
return inp;
}
+ private void detachPlans(List<PhysicalPlan> plans) {
+ for (PhysicalPlan ep : plans) {
+ ep.detachInput();
+ }
+ }
+
protected Object getKeyFromResult(List<Result> resLst, byte type) throws ExecException {
Object key;
if(resLst.size()>1){
@@ -412,6 +420,8 @@ public class POLocalRearrange extends Ph
}
protected Tuple constructLROutput(List<Result> resLst, List<Result> secondaryResLst, Tuple value) throws ExecException{
+ Tuple lrOutput = mTupleFactory.newTuple(3);
+ lrOutput.set(0, Byte.valueOf(this.index));
//Construct key
Object key;
Object secondaryKey=null;
@@ -673,13 +683,6 @@ public class POLocalRearrange extends Ph
clone.secondaryKeyType = secondaryKeyType;
clone.useSecondaryKey = useSecondaryKey;
clone.index = index;
- try {
- clone.lrOutput.set(0, index);
- } catch (ExecException e) {
- CloneNotSupportedException cnse = new CloneNotSupportedException("Problem with setting index of output.");
- cnse.initCause(e);
- throw cnse;
- }
// Needs to be called as setDistinct so that the fake index tuple gets
// created.
clone.setDistinct(mIsDistinct);
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeCogroup.java Fri Aug 13 22:45:46 2010
@@ -461,7 +461,7 @@ public class POMergeCogroup extends Phys
String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
throw new ExecException(errMsg,errCode,PigException.BUG);
}
-
+ lr.detachInput();
return mTupleFactory.newTuple(((Tuple)lrOut.result).getAll());
}
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMergeJoin.java Fri Aug 13 22:45:46 2010
@@ -419,6 +419,7 @@ public class POMergeJoin extends Physica
}
} else {
Result res = rightPipelineLeaf.getNext(dummyTuple);
+ rightPipelineLeaf.detachInput();
switch(res.returnStatus){
case POStatus.STATUS_OK:
return res;
@@ -461,6 +462,7 @@ public class POMergeJoin extends Physica
POLocalRearrange lr = LRs[lrIdx];
lr.attachInput((Tuple)inp.result);
Result lrOut = lr.getNext(dummyTuple);
+ lr.detachInput();
if(lrOut.returnStatus!=POStatus.STATUS_OK){
int errCode = 2167;
String errMsg = "LocalRearrange used to extract keys from tuple isn't configured correctly";
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POMultiQueryPackage.java Fri Aug 13 22:45:46 2010
@@ -240,6 +240,7 @@ public class POMultiQueryPackage extends
pack.attachInput(curKey, tupIter);
Result res = pack.getNext(t);
+ pack.detachInput();
Tuple tuple = (Tuple)res.result;
Modified: hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java
URL: http://svn.apache.org/viewvc/hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java?rev=985392&r1=985391&r2=985392&view=diff
==============================================================================
--- hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java (original)
+++ hadoop/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POOptimizedForEach.java Fri Aug 13 22:45:46 2010
@@ -108,6 +108,8 @@ public class POOptimizedForEach extends
}
if(res.returnStatus==POStatus.STATUS_EOP) {
processingPlan = false;
+ for(PhysicalPlan plan : inputPlans)
+ plan.detachInput();
return res;
}
if(res.returnStatus==POStatus.STATUS_ERR) {