You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by wv...@apache.org on 2014/01/23 21:08:07 UTC
git commit: DATAFU-11 ReservoirSample does not behave as expected
when grouping by a key other than ALL
Updated Branches:
refs/heads/master 862a7fb3a -> 4414e8cac
DATAFU-11 ReservoirSample does not behave as expected when grouping by a key other than ALL
* For algebraic, clear the reservoir before each use.
* For accumulate, clear reservoir in cleanup and don't null out scoreGen to avoid unnecessary garbage collection.
Signed-off-by: William Vaughan <wv...@linkedin.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/4414e8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/4414e8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/4414e8ca
Branch: refs/heads/master
Commit: 4414e8cac8d9126173cef0ff0e98347e800e653c
Parents: 862a7fb
Author: Matt Hayes <mh...@linkedin.com>
Authored: Wed Jan 22 11:51:55 2014 -0800
Committer: William Vaughan <wv...@linkedin.com>
Committed: Thu Jan 23 12:04:26 2014 -0800
----------------------------------------------------------------------
.../datafu/pig/sampling/ReservoirSample.java | 9 ++-
.../pig/sampling/WeightedReservoirSample.java | 8 ++
.../datafu/test/pig/sampling/SamplingTests.java | 81 ++++++++++++++++++++
.../WeightedReservoirSamplingTests.java | 80 +++++++++++++++++++
4 files changed, 176 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/src/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ReservoirSample.java b/src/java/datafu/pig/sampling/ReservoirSample.java
index 747b28c..48deaad 100644
--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -97,8 +97,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
@Override
public void cleanup()
{
- this.reservoir = null;
- this.scoreGen = null;
+ this.reservoir.clear();
}
@Override
@@ -218,6 +217,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
}
} else {
+ getReservoir().clear();
+
for (Tuple sample : samples) {
getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
}
@@ -256,6 +257,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
@Override
public Tuple exec(Tuple input) throws IOException {
+ getReservoir().clear();
+
DataBag bagOfSamples = (DataBag) input.get(0);
for (Tuple innerTuple : bagOfSamples) {
DataBag samples = (DataBag) innerTuple.get(0);
@@ -300,6 +303,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
@Override
public DataBag exec(Tuple input) throws IOException {
+ getReservoir().clear();
+
DataBag bagOfSamples = (DataBag) input.get(0);
for (Tuple innerTuple : bagOfSamples) {
DataBag samples = (DataBag) innerTuple.get(0);
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/src/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/WeightedReservoirSample.java b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
index 0792411..92af6a3 100644
--- a/src/java/datafu/pig/sampling/WeightedReservoirSample.java
+++ b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -252,6 +252,14 @@ public class WeightedReservoirSample extends ReservoirSample {
@Override
public double generateScore(Tuple sample) throws ExecException
{
+ if(this.weightIdx >= sample.size())
+ {
+ throw new ExecException(String.format("Weight index %d is outside tuple bounds", this.weightIdx));
+ }
+ if (sample.get(this.weightIdx) == null)
+ {
+ throw new ExecException(String.format("null value for weight at index %d",this.weightIdx));
+ }
double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
if(Double.compare(weight, 0.0) <= 0)
{
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/test/pig/datafu/test/pig/sampling/SamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SamplingTests.java b/test/pig/datafu/test/pig/sampling/SamplingTests.java
index 0dbbda0..9209133 100644
--- a/test/pig/datafu/test/pig/sampling/SamplingTests.java
+++ b/test/pig/datafu/test/pig/sampling/SamplingTests.java
@@ -23,6 +23,7 @@ import java.io.IOException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -341,6 +342,86 @@ public class SamplingTests extends PigTests
}
}
+ /**
+ register $JAR_PATH
+
+ DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
+ DEFINE Assert datafu.pig.util.Assert();
+
+ data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int);
+ sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C)) as sample_data;
+ sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+ sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String reservoirSampleGroupTest;
+
+ /**
+ * Verifies that ReservoirSample works when data grouped by a key.
+ * In particular it ensures that the reservoir is not reused across keys.
+ *
+ * <p>
+ * This confirms the fix for DATAFU-11.
+ * </p>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void reservoirSampleGroupTest() throws Exception
+ {
+ // first value is the key. last value matches the key so we can
+ // verify the register is reset for each key. values should not
+ // bleed across to other keys.
+ writeLinesToFile("input",
+ "1\tB1\t1",
+ "1\tB1\t1",
+ "1\tB3\t1",
+ "1\tB4\t1",
+ "2\tB1\t2",
+ "2\tB2\t2",
+ "3\tB1\t3",
+ "3\tB1\t3",
+ "3\tB3\t3",
+ "4\tB1\t4",
+ "4\tB2\t4",
+ "4\tB3\t4",
+ "4\tB4\t4",
+ "5\tB1\t5",
+ "6\tB2\t6",
+ "6\tB2\t6",
+ "6\tB3\t6",
+ "7\tB1\t7",
+ "7\tB2\t7",
+ "7\tB3\t7",
+ "8\tB1\t8",
+ "8\tB2\t8",
+ "9\tB3\t9",
+ "9\tB3\t9",
+ "9\tB6\t9",
+ "9\tB5\t9",
+ "10\tB1\t10",
+ "10\tB2\t10",
+ "10\tB2\t10",
+ "10\tB2\t10",
+ "10\tB6\t10",
+ "10\tB7\t10");
+
+ for(int i=1; i<=3; i++) {
+ int reservoirSize = i ;
+ PigTest test = createPigTestFromString(reservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
+ test.runScript();
+
+ List<Tuple> tuples = getLinesForAlias(test, "sampled");
+
+ for (Tuple tuple : tuples)
+ {
+ Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
+ }
+ }
+ }
+
@Test
public void reservoirSampleExecTest() throws IOException
{
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
index f507cbd..9bab8a0 100644
--- a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -44,6 +44,86 @@ import datafu.test.pig.PigTests;
*/
public class WeightedReservoirSamplingTests extends PigTests
{
+ /**
+ register $JAR_PATH
+
+ DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
+ DEFINE Assert datafu.pig.util.Assert();
+
+ data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
+ sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
+ sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+ sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
+ STORE sampled INTO 'output';
+
+ */
+ @Multiline
+ private String weightedReservoirSampleGroupTest;
+
+ /**
+ * Verifies that WeightedReservoirSample works when data grouped by a key.
+ * In particular it ensures that the reservoir is not reused across keys.
+ *
+ * <p>
+ * This confirms the fix for DATAFU-11.
+ * </p>
+ *
+ * @throws Exception
+ */
+ @Test
+ public void weightedReservoirSampleGroupTest() throws Exception
+ {
+ // first value is the key. second to last value matches the key so we can
+ // verify the register is reset for each key. values should not
+ // bleed across to other keys.
+ writeLinesToFile("input",
+ "1\tB1\t1\t1.0",
+ "1\tB1\t1\t1.0",
+ "1\tB3\t1\t1.0",
+ "1\tB4\t1\t1.0",
+ "2\tB1\t2\t1.0",
+ "2\tB2\t2\t1.0",
+ "3\tB1\t3\t1.0",
+ "3\tB1\t3\t1.0",
+ "3\tB3\t3\t1.0",
+ "4\tB1\t4\t1.0",
+ "4\tB2\t4\t1.0",
+ "4\tB3\t4\t1.0",
+ "4\tB4\t4\t1.0",
+ "5\tB1\t5\t1.0",
+ "6\tB2\t6\t1.0",
+ "6\tB2\t6\t1.0",
+ "6\tB3\t6\t1.0",
+ "7\tB1\t7\t1.0",
+ "7\tB2\t7\t1.0",
+ "7\tB3\t7\t1.0",
+ "8\tB1\t8\t1.0",
+ "8\tB2\t8\t1.0",
+ "9\tB3\t9\t1.0",
+ "9\tB3\t9\t1.0",
+ "9\tB6\t9\t1.0",
+ "9\tB5\t9\t1.0",
+ "10\tB1\t10\t1.0",
+ "10\tB2\t10\t1.0",
+ "10\tB2\t10\t1.0",
+ "10\tB2\t10\t1.0",
+ "10\tB6\t10\t1.0",
+ "10\tB7\t10\t1.0");
+
+ for(int i=1; i<=3; i++) {
+ int reservoirSize = i ;
+ PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
+ test.runScript();
+
+ List<Tuple> tuples = getLinesForAlias(test, "sampled");
+
+ for (Tuple tuple : tuples)
+ {
+ Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
+ }
+ }
+ }
+
/**
register $JAR_PATH