You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ch...@apache.org on 2014/03/06 19:29:35 UTC

svn commit: r1574986 - in /pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine: mapReduceLayer/ mapReduceLayer/plans/ physicalLayer/plans/ physicalLayer/relationalOperators/ physicalLayer/util/

Author: cheolsoo
Date: Thu Mar  6 18:29:35 2014
New Revision: 1574986

URL: http://svn.apache.org/r1574986
Log:
PIG-3786: POReservoirSample should handle endOfAllInput flag (daijy)

Modified:
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
    pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/PhyPlanSetter.java Thu Mar  6 18:29:35 2014
@@ -64,6 +64,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -330,25 +331,30 @@ public class PhyPlanSetter extends PhyPl
      */
 
     @Override
-    public void visitPartialAgg(POPartialAgg poPartialAgg) {
+    public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
        poPartialAgg.setParentPlan(parent);
     }
 
     @Override
-    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) {
+    public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         optimizedForEach.setParentPlan(parent);
     }
 
     @Override
     public void visitPreCombinerLocalRearrange(
-            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+            POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
         preCombinerLocalRearrange.setParentPlan(parent);
     }
 
-
     @Override
     public void visitMergeCoGroup(POMergeCogroup mergeCoGrp)
             throws VisitorException {
         mergeCoGrp.setParentPlan(parent);
     }
+
+    @Override
+    public void visitReservoirSample(POReservoirSample reservoirSample)
+            throws VisitorException {
+        reservoirSample.setParentPlan(parent);
+    }
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/mapReduceLayer/plans/EndOfAllInputSetter.java Thu Mar  6 18:29:35 2014
@@ -24,6 +24,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeCogroup;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POMergeJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POStream;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POCollectedGroup;
 import org.apache.pig.impl.plan.DepthFirstWalker;
@@ -32,9 +33,9 @@ import org.apache.pig.impl.plan.VisitorE
 /**
  * This visitor visits the MRPlan and does the following
  * for each MROper: If the map plan or the reduce plan of the MROper has
- *  an end of all input flag present in it, this marks in the MROper whether the map 
+ *  an end of all input flag present in it, this marks in the MROper whether the map
  * has an end of all input flag set or if the reduce has an end of all input flag set.
- *  
+ *
  */
 public class EndOfAllInputSetter extends MROpPlanVisitor {
 
@@ -47,43 +48,40 @@ public class EndOfAllInputSetter extends
 
     @Override
     public void visitMROp(MapReduceOper mr) throws VisitorException {
-        
+
         EndOfAllInputChecker checker = new EndOfAllInputChecker(mr.mapPlan);
         checker.visit();
         if(checker.isEndOfAllInputPresent()) {
-            mr.setEndOfAllInputInMap(true);            
+            mr.setEndOfAllInputInMap(true);
         }
-        
+
         checker = new EndOfAllInputChecker(mr.reducePlan);
         checker.visit();
         if(checker.isEndOfAllInputPresent()) {
-            mr.setEndOfAllInputInReduce(true);            
-        }      
-        
+            mr.setEndOfAllInputInReduce(true);
+        }
+
     }
 
     public static class EndOfAllInputChecker extends PhyPlanVisitor {
-        
+
         private boolean endOfAllInputFlag = false;
         public EndOfAllInputChecker(PhysicalPlan plan) {
             super(plan, new DepthFirstWalker<PhysicalOperator, PhysicalPlan>(plan));
         }
-        
-        /* (non-Javadoc)
-         * @see org.apache.pig.backend.hadoop.executionengine.physicalLayer.plans.PhyPlanVisitor#visitStream(org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStream)
-         */
+
         @Override
         public void visitStream(POStream stream) throws VisitorException {
             // stream present
             endOfAllInputFlag = true;
         }
-        
+
         @Override
         public void visitMergeJoin(POMergeJoin join) throws VisitorException {
             // merge join present
             endOfAllInputFlag = true;
         }
-       
+
         @Override
         public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException {
             // map side group present
@@ -97,7 +95,12 @@ public class EndOfAllInputSetter extends
         }
 
         @Override
-        public void visitPartialAgg(POPartialAgg partAgg){
+        public void visitPartialAgg(POPartialAgg partAgg) throws VisitorException {
+            endOfAllInputFlag = true;
+        }
+
+        @Override
+        public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
             endOfAllInputFlag = true;
         }
 
@@ -109,4 +112,3 @@ public class EndOfAllInputSetter extends
         }
     }
 }
-

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/plans/PhyPlanVisitor.java Thu Mar  6 18:29:35 2014
@@ -65,6 +65,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.PORank;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -115,7 +116,6 @@ public class PhyPlanVisitor extends Plan
         popWalker();
     }
 
-
     public void visitCollectedGroup(POCollectedGroup mg) throws VisitorException{
         List<PhysicalPlan> inpPlans = mg.getPlans();
         for (PhysicalPlan plan : inpPlans) {
@@ -267,12 +267,10 @@ public class PhyPlanVisitor extends Plan
 
     public void visitBinCond(POBinCond binCond) {
         // do nothing
-
     }
 
     public void visitNegative(PONegative negative) {
         //do nothing
-
     }
 
     public void visitUserFunc(POUserFunc userFunc) throws VisitorException {
@@ -285,12 +283,10 @@ public class PhyPlanVisitor extends Plan
 
     public void visitMapLookUp(POMapLookUp mapLookUp) {
         // TODO Auto-generated method stub
-
     }
 
     public void visitCast(POCast cast) {
         // TODO Auto-generated method stub
-
     }
 
     public void visitLimit(POLimit lim) throws VisitorException{
@@ -323,7 +319,6 @@ public class PhyPlanVisitor extends Plan
      */
     public void visitStream(POStream stream) throws VisitorException {
         // TODO Auto-generated method stub
-
     }
 
     public void visitSkewedJoin(POSkewedJoin sk) throws VisitorException {
@@ -344,19 +339,20 @@ public class PhyPlanVisitor extends Plan
      */
     public void visitPOOptimizedForEach(POOptimizedForEach optimizedForEach) throws VisitorException {
         // TODO Auto-generated method stub
-
     }
 
     /**
      * @param preCombinerLocalRearrange
      */
     public void visitPreCombinerLocalRearrange(
-            POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+            POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
         // TODO Auto-generated method stub
     }
 
-    public void visitPartialAgg(POPartialAgg poPartialAgg) {
+    public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
     }
 
+    public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+    }
 
 }

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Thu Mar  6 18:29:35 2014
@@ -38,6 +38,10 @@ public class POReservoirSample extends P
 
     private transient int nextSampleIdx= 0;
 
+    private int rowProcessed = 0;
+
+    private boolean sampleCollectionDone = false;
+
     //array to store the result
     private transient Result[] samples = null;
 
@@ -70,19 +74,21 @@ public class POReservoirSample extends P
 
     @Override
     public void visit(PhyPlanVisitor v) throws VisitorException {
+        v.visitReservoirSample(this);
     }
 
     @Override
     public Result getNextTuple() throws ExecException {
-        if(samples != null){
+        if (sampleCollectionDone){
             return getSample();
         }
         //else collect samples
-        samples = new Result[numSamples];
+        if (samples == null) {
+            samples = new Result[numSamples];
+        }
 
         // populate the samples array with first numSamples tuples
         Result res = null;
-        int rowProcessed = 0;
         while (rowProcessed < numSamples) {
             res = processInput();
             if (res.returnStatus == POStatus.STATUS_OK) {
@@ -95,26 +101,28 @@ public class POReservoirSample extends P
             }
         }
 
-        int rowNum = numSamples + 1;
+        int rowNum = rowProcessed;
         Random randGen = new Random();
 
-        if (res.returnStatus == POStatus.STATUS_OK) { // did not exhaust all tuples
-            while (true) {
-                // pick this as sample
-                Result sampleResult = processInput();
-                if (sampleResult.returnStatus == POStatus.STATUS_NULL) {
-                    continue;
-                } else if (sampleResult.returnStatus != POStatus.STATUS_OK) {
-                    break;
-                }
-
-                // collect samples until input is exhausted
-                int rand = randGen.nextInt(rowNum);
-                if (rand < numSamples) {
-                    samples[rand] = sampleResult;
-                }
-                rowNum++;
+        while (true) {
+            // pick this as sample
+            res = processInput();
+            if (res.returnStatus == POStatus.STATUS_NULL) {
+                continue;
+            } else if (res.returnStatus != POStatus.STATUS_OK) {
+                break;
+            }
+
+            // collect samples until input is exhausted
+            int rand = randGen.nextInt(rowNum);
+            if (rand < numSamples) {
+                samples[rand] = res;
             }
+            rowNum++;
+        }
+
+        if (this.parentPlan.endOfAllInput && res.returnStatus == POStatus.STATUS_EOP) {
+            sampleCollectionDone = true;
         }
 
         return getSample();

Modified: pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java
URL: http://svn.apache.org/viewvc/pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java?rev=1574986&r1=1574985&r2=1574986&view=diff
==============================================================================
--- pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java (original)
+++ pig/branches/tez/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/util/PlanHelper.java Thu Mar  6 18:29:35 2014
@@ -70,6 +70,7 @@ import org.apache.pig.backend.hadoop.exe
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartialAgg;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPartitionRearrange;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POPreCombinerLocalRearrange;
+import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POReservoirSample;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSkewedJoin;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSort;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators.POSplit;
@@ -469,16 +470,22 @@ public class PlanHelper {
 
         @Override
         public void visitPreCombinerLocalRearrange(
-                POPreCombinerLocalRearrange preCombinerLocalRearrange) {
+                POPreCombinerLocalRearrange preCombinerLocalRearrange) throws VisitorException {
             super.visitPreCombinerLocalRearrange(preCombinerLocalRearrange);
             visit(preCombinerLocalRearrange);
         }
 
         @Override
-        public void visitPartialAgg(POPartialAgg poPartialAgg) {
+        public void visitPartialAgg(POPartialAgg poPartialAgg) throws VisitorException {
             super.visitPartialAgg(poPartialAgg);
             visit(poPartialAgg);
         }
+
+        @Override
+        public void visitReservoirSample(POReservoirSample reservoirSample) throws VisitorException {
+            super.visitReservoirSample(reservoirSample);
+            visit(reservoirSample);
+        }
     }
 
 }