You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/06 19:29:35 UTC
svn commit: r1574986 - in
/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine:
mapReduceLayer/ mapReduceLayer/plans/ physicalLayer/plans/
physicalLayer/relationalOperators/ physicalLayer/util/
Author: cheolsoo
Date: Thu Mar 6 18:29:35 2014
New Revision: 1574986
URL: http://svn.apache.org/r1574986
Log:
PIG-3786: POReservoirSample should handle endOfAllInput flag (daijy)
Modified:
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Mar 6 18:29:35 2014
@@ -64,6 +64,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -330,25 +331,30 @@ public class PhyPlanSetter extends PhyPl
*/
@Override
- public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
poPartialAgg.setParentPlan(parent);
}
@Override
- public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
+ public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
optimizedForEach.setParentPlan(parent);
}
@Override
public void visitPreCombinerLocalRearrange(
- POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
preCombinerLocalRearrange.setParentPlan(parent);
}
-
@Override
public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
throws VisitorException {
mergeCoGrp.setParentPlan(parent);
}
+
+ @Override
+ public void visitReservoirSample(POReservoirSample reservoirSample)
+ throws VisitorException {
+ reservoirSample.setParentPlan(parent);
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Mar 6 18:29:35 2014
@@ -24,6 +24,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -32,9 +33,9 @@ import org.apache.pig.impl.plan.VisitorE
/**
* This visitor visits the MRPlan and does the following
* for each MROper: If the map plan or the reduce plan of the MROper has
- * an end of all input flag present in it, this marks in the MROper whether the map
+ * an end of all input flag present in it, this marks in the MROper whether the map
* has an end of all input flag set or if the reduce has an end of all input flag set.
- *
+ *
*/
public class EndOfAllInputSetter extends MROpPlanVisitor {
@@ -47,43 +48,40 @@ public class EndOfAllInputSetter extends
@Override
public void visitMROp(MapReduceOper mr) throws VisitorException {
-
+
EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
- mr.setEndOfAllInputInMap(true);
+ mr.setEndOfAllInputInMap(true);
}
-
+
checker = new EndOfAllInputChecker(mr.reducePlan);
checker.visit();
if(checker.isEndOfAllInputPresent()) {
- mr.setEndOfAllInputInReduce(true);
- }
-
+ mr.setEndOfAllInputInReduce(true);
+ }
+
}
public static class EndOfAllInputChecker extends PhyPlanVisitor {
-
+
private boolean endOfAllInputFlag = false;
public EndOfAllInputChecker(PhysicalPlan plan) {
super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
}
-
- /* (non-Javadoc)
- * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
- */
+
@Override
public void visitStream(POStream stream) throws VisitorException {
// stream present
endOfAllInputFlag = true;
}
-
+
@Override
public void visitMergeJoin(POMergeJoin join) throws VisitorException {
// merge join present
endOfAllInputFlag = true;
}
-
+
@Override
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
// map side group present
@@ -97,7 +95,12 @@ public class EndOfAllInputSetter extends
}
@Override
- public void visitPartialAgg(POPartialAgg partAgg){
+ public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
+ endOfAllInputFlag = true;
+ }
+
+ @Override
+ public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
endOfAllInputFlag = true;
}
@@ -109,4 +112,3 @@ public class EndOfAllInputSetter extends
}
}
}
-
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Mar 6 18:29:35 2014
@@ -65,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -115,7 +116,6 @@ public class PhyPlanVisitor extends Plan
popWalker();
}
-
public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
List<PhysicalPlan> inpPlans = mg.getPlans();
for (PhysicalPlan plan : inpPlans) {
@@ -267,12 +267,10 @@ public class PhyPlanVisitor extends Plan
public void visitBinCond(POBinCond binCond) {
// do nothing
-
}
public void visitNegative(PONegative negative) {
//do nothing
-
}
public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
@@ -285,12 +283,10 @@ public class PhyPlanVisitor extends Plan
public void visitMapLookUp(POMapLookUp mapLookUp) {
// TODO Auto-generated method stub
-
}
public void visitCast(POCast cast) {
// TODO Auto-generated method stub
-
}
public void visitLimit(POLimit lim) throws VisitorException{
@@ -323,7 +319,6 @@ public class PhyPlanVisitor extends Plan
*/
public void visitStream(POStream stream) throws VisitorException {
// TODO Auto-generated method stub
-
}
public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
@@ -344,19 +339,20 @@ public class PhyPlanVisitor extends Plan
*/
public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
// TODO Auto-generated method stub
-
}
/**
* @param preCombinerLocalRearrange
*/
public void visitPreCombinerLocalRearrange(
- POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
// TODO Auto-generated method stub
}
- public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
}
+ public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+ }
}
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Mar 6 18:29:35 2014
@@ -38,6 +38,10 @@ public class POReservoirSample extends P
private transient int nextSampleIdx= 0;
+ private int rowProcessed = 0;
+
+ private boolean sampleCollectionDone = false;
+
//array to store the result
private transient Result[] samples = null;
@@ -70,19 +74,21 @@ public class POReservoirSample extends P
@Override
public void visit(PhyPlanVisitor v) throws VisitorException {
+ v.visitReservoirSample(this);
}
@Override
public Result getNextTuple() throws ExecException {
- if(samples != null){
+ if (sampleCollectionDone){
return getSample();
}
//else collect samples
- samples = new Result[numSamples];
+ if (samples == null) {
+ samples = new Result[numSamples];
+ }
// populate the samples array with first numSamples tuples
Result res = null;
- int rowProcessed = 0;
while (rowProcessed < numSamples) {
res = processInput();
if (res.returnStatus == POStatus.STATUS_OK) {
@@ -95,26 +101,28 @@ public class POReservoirSample extends P
}
}
- int rowNum = numSamples + 1;
+ int rowNum = rowProcessed;
Random randGen = new Random();
- if (res.returnStatus == POStatus.STATUS_OK) { // did not exhaust all tuples
- while (true) {
- // pick this as sample
- Result sampleResult = processInput();
- if (sampleResult.returnStatus == POStatus.STATUS_NULL) {
- continue;
- } else if (sampleResult.returnStatus != POStatus.STATUS_OK) {
- break;
- }
-
- // collect samples until input is exhausted
- int rand = randGen.nextInt(rowNum);
- if (rand < numSamples) {
- samples[rand] = sampleResult;
- }
- rowNum++;
+ while (true) {
+ // pick this as sample
+ res = processInput();
+ if (res.returnStatus == POStatus.STATUS_NULL) {
+ continue;
+ } else if (res.returnStatus != POStatus.STATUS_OK) {
+ break;
+ }
+
+ // collect samples until input is exhausted
+ int rand = randGen.nextInt(rowNum);
+ if (rand < numSamples) {
+ samples[rand] = res;
}
+ rowNum++;
+ }
+
+ if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
+ sampleCollectionDone = true;
}
return getSample();
Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Mar 6 18:29:35 2014
@@ -70,6 +70,7 @@ import org.apache.pig.backend.hadoop.exe
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -469,16 +470,22 @@ public class PlanHelper {
@Override
public void visitPreCombinerLocalRearrange(
- POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+ POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange);
visit(preCombinerLocalRearrange);
}
@Override
- public void visitPartialAgg(POPartialAgg poPartialAgg) {
+ public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
super.visitPartialAgg(poPartialAgg);
visit(poPartialAgg);
}
+
+ @Override
+ public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+ super.visitReservoirSample(reservoirSample);
+ visit(reservoirSample);
+ }
}
}