You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pig.apache.org by ro...@apache.org on 2017/12/31 01:30:30 UTC
svn commit: r1819711 - in /pig/trunk: CHANGES.txt
src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
Author: rohini
Date: Sun Dec 31 01:30:30 2017
New Revision: 1819711
URL: http://svn.apache.org/viewvc?rev=1819711&view=rev
Log:
PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini)
Modified:
pig/trunk/CHANGES.txt
pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
Modified: pig/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/pig/trunk/CHANGES.txt?rev=1819711&r1=1819710&r2=1819711&view=diff
==============================================================================
--- pig/trunk/CHANGES.txt (original)
+++ pig/trunk/CHANGES.txt Sun Dec 31 01:30:30 2017
@@ -62,6 +62,8 @@ OPTIMIZATIONS
BUG FIXES
+PIG-5311: POReservoirSample fails for more than Integer.MAX_VALUE records (rohini)
+
PIG-3864: ToDate(userstring, format, timezone) computes DateTime with strange handling of Daylight Saving Time with location based timezones (daijy via rohini)
PIG-5312: Uids not set in inner schemas after UNION ONSCHEMA (tmwoodruff via knoguchi)
Modified: pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java
URL: http://svn.apache.org/viewvc/pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java?rev=1819711&r1=1819710&r2=1819711&view=diff
==============================================================================
--- pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java (original)
+++ pig/trunk/src/org/apache/pig/backend/hadoop/executionengine/physicalLayer/relationalOperators/POReservoirSample.java Sun Dec 31 01:30:30 2017
@@ -18,8 +18,9 @@
package org.apache.pig.backend.hadoop.executionengine.physicalLayer.relationalOperators;
import java.util.List;
-import java.util.Random;
+import org.apache.commons.math3.random.RandomDataGenerator;
+import org.apache.hadoop.mapreduce.JobContext;
import org.apache.pig.backend.executionengine.ExecException;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.POStatus;
import org.apache.pig.backend.hadoop.executionengine.physicalLayer.PhysicalOperator;
@@ -29,17 +30,18 @@ import org.apache.pig.data.Tuple;
import org.apache.pig.impl.builtin.PoissonSampleLoader;
import org.apache.pig.impl.plan.OperatorKey;
import org.apache.pig.impl.plan.VisitorException;
+import org.apache.pig.impl.util.UDFContext;
public class POReservoirSample extends PhysicalOperator {
private static final long serialVersionUID = 1L;
// number of samples to be sampled
- protected int numSamples;
+ protected long numSamples;
private transient int nextSampleIdx = 0;
- private transient int rowProcessed = 0;
+ private transient long rowProcessed = 0;
private transient boolean sampleCollectionDone = false;
@@ -49,6 +51,8 @@ public class POReservoirSample extends P
// last sample result
private transient Result lastSample = null;
+ private transient RandomDataGenerator randGen;
+
public POReservoirSample(OperatorKey k) {
this(k, -1, null);
}
@@ -65,7 +69,7 @@ public class POReservoirSample extends P
super(k, rp, inp);
}
- public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, int numSamples) {
+ public POReservoirSample(OperatorKey k, int rp, List<PhysicalOperator> inp, long numSamples) {
super(k, rp, inp);
this.numSamples = numSamples;
}
@@ -88,7 +92,11 @@ public class POReservoirSample extends P
}
//else collect samples
if (samples == null) {
- samples = new Result[numSamples];
+ samples = new Result[(int)numSamples];
+ long taskIdHashCode = UDFContext.getUDFContext().getJobConf().get(JobContext.TASK_ID).hashCode();
+ long randomSeed = ((long)taskIdHashCode << 32) | (taskIdHashCode & 0xffffffffL);
+ randGen = new RandomDataGenerator();
+ randGen.reSeed(randomSeed);
}
// populate the samples array with first numSamples tuples
@@ -96,7 +104,7 @@ public class POReservoirSample extends P
while (rowProcessed < numSamples) {
res = processInput();
if (res.returnStatus == POStatus.STATUS_OK) {
- samples[rowProcessed] = res;
+ samples[(int)rowProcessed] = res;
rowProcessed++;
} else if (res.returnStatus == POStatus.STATUS_NULL) {
continue;
@@ -114,7 +122,6 @@ public class POReservoirSample extends P
}
if (res == null || res.returnStatus != POStatus.STATUS_EOP) {
- Random randGen = new Random();
while (true) {
// pick this as sample
res = processInput();
@@ -125,9 +132,9 @@ public class POReservoirSample extends P
}
// collect samples until input is exhausted
- int rand = randGen.nextInt(rowProcessed + 1);
+ long rand = randGen.nextLong(0, rowProcessed + 1);
if (rand < numSamples) {
- samples[rand] = res;
+ samples[(int)rand] = res;
}
rowProcessed++;
}
@@ -170,13 +177,15 @@ public class POReservoirSample extends P
if (illustrator != null) {
illustratorMarkup(samples[nextSampleIdx].result, samples[nextSampleIdx].result, 0);
}
- Result res = samples[nextSampleIdx++];
+ Result res = samples[nextSampleIdx];
+ samples[nextSampleIdx++] = null; //Free memory
if (res == null) { // Input data has lesser rows than numSamples
return RESULT_EMPTY;
}
return res;
}
else{
+ samples = null; // Free memory
return RESULT_EOP;
}
}
@@ -212,7 +221,8 @@ public class POReservoirSample extends P
}
t.set(sz, PoissonSampleLoader.NUMROWS_TUPLE_MARKER);
- t.set(sz + 1, (long)rowProcessed);
+ t.set(sz + 1, rowProcessed);
return new Result(POStatus.STATUS_OK, t);
}
+
}