You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/12/31 01:30:30 UTC

svn commit: r1819711 - in /pig/trunk: CHANGES.txt src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java

Author: rohini
Date: Sun Dec 31 01:30:30 2017
New Revision: 1819711

URL: http://svn.apache.org/viewvc?rev=1819711&view=rev
Log:
PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini)

Modified:
    pig/trunk/CHANGES.txt
    pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java

Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819711&r1=1819710&r2=1819711&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Dec 31 01:30:30 2017
@@ -62,6 +62,8 @@ OPTIMIZATIONS
  
 BUG FIXES
 
+PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini)
+
 PIG-3864: ToDate(userstring, format, timezone) computes DateTime with strange handling of Daylight Saving Time with location based timezones (daijy via rohini)
 
 PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)

Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1819711&r1=1819710&r2=1819711&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Sun Dec 31 01:30:30 2017
@@ -18,8 +18,9 @@
 package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
 
 import java.util.List;
-import java.util.Random;
 
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.apache.hadoop.mapreduce.JobContext;
 import org.apache.pig.backend.executionengine.ExecException;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
 import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -29,17 +30,18 @@ import org.apache.pig.data.Tuple;
 import org.apache.pig.impl.builtin.PoissonSampleLoader;
 import org.apache.pig.impl.plan.OperatorKey;
 import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
 
 public class POReservoirSample extends PhysicalOperator {
 
     private static final long serialVersionUID = 1L;
 
     // number of samples to be sampled
-    protected int numSamples;
+    protected long numSamples;
 
     private transient int nextSampleIdx = 0;
 
-    private transient int rowProcessed = 0;
+    private transient long rowProcessed = 0;
 
     private transient boolean sampleCollectionDone = false;
 
@@ -49,6 +51,8 @@ public class POReservoirSample extends P
     // last sample result
     private transient Result lastSample = null;
 
+    private transient RandomDataGenerator randGen;
+
     public POReservoirSample(OperatorKey k) {
         this(k, -1, null);
     }
@@ -65,7 +69,7 @@ public class POReservoirSample extends P
         super(k, rp, inp);
     }
 
-    public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, int numSamples) {
+    public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, long numSamples) {
         super(k, rp, inp);
         this.numSamples = numSamples;
     }
@@ -88,7 +92,11 @@ public class POReservoirSample extends P
         }
         //else collect samples
         if (samples == null) {
-            samples = new Result[numSamples];
+            samples = new Result[(int)numSamples];
+            long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+            long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+            randGen = new RandomDataGenerator();
+            randGen.reSeed(randomSeed);
         }
 
         // populate the samples array with first numSamples tuples
@@ -96,7 +104,7 @@ public class POReservoirSample extends P
         while (rowProcessed < numSamples) {
             res = processInput();
             if (res.returnStatus == POStatus.STATUS_OK) {
-                samples[rowProcessed] = res;
+                samples[(int)rowProcessed] = res;
                 rowProcessed++;
             } else if (res.returnStatus == POStatus.STATUS_NULL) {
                 continue;
@@ -114,7 +122,6 @@ public class POReservoirSample extends P
         }
 
         if (res == null || res.returnStatus != POStatus.STATUS_EOP) {
-            Random randGen = new Random();
             while (true) {
                 // pick this as sample
                 res = processInput();
@@ -125,9 +132,9 @@ public class POReservoirSample extends P
                 }
 
                 // collect samples until input is exhausted
-                int rand = randGen.nextInt(rowProcessed + 1);
+                long rand = randGen.nextLong(0, rowProcessed + 1);
                 if (rand < numSamples) {
-                    samples[rand] = res;
+                    samples[(int)rand] = res;
                 }
                 rowProcessed++;
             }
@@ -170,13 +177,15 @@ public class POReservoirSample extends P
             if (illustrator != null) {
                 illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
             }
-            Result res = samples[nextSampleIdx++];
+            Result res = samples[nextSampleIdx];
+            samples[nextSampleIdx++] = null; //Free memory
             if (res == null) { // Input data has lesser rows than numSamples
                 return RESULT_EMPTY;
             }
             return res;
         }
         else{
+            samples = null; // Free memory
             return RESULT_EOP;
         }
     }
@@ -212,7 +221,8 @@ public class POReservoirSample extends P
         }
 
         t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
-        t.set(sz + 1, (long)rowProcessed);
+        t.set(sz + 1, rowProcessed);
         return new Result(POStatus.STATUS_OK, t);
     }
+
 }