You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by wv...@apache.org on 2014/01/23 21:08:07 UTC

git commit: DATAFU-11 ReservoirSample does not behave as expected when grouping by a key other than ALL

Updated Branches:
  refs/heads/master 862a7fb3a -> 4414e8cac


DATAFU-11 ReservoirSample does not behave as expected when grouping by a key other than ALL

* For algebraic, clear the reservoir before each use.
* For accumulate, clear reservoir in cleanup and don't null out scoreGen to avoid unnecessary garbage collection.

Signed-off-by: William Vaughan <wv...@linkedin.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/4414e8ca
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/4414e8ca
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/4414e8ca

Branch: refs/heads/master
Commit: 4414e8cac8d9126173cef0ff0e98347e800e653c
Parents: 862a7fb
Author: Matt Hayes <mh...@linkedin.com>
Authored: Wed Jan 22 11:51:55 2014 -0800
Committer: William Vaughan <wv...@linkedin.com>
Committed: Thu Jan 23 12:04:26 2014 -0800

----------------------------------------------------------------------
 .../datafu/pig/sampling/ReservoirSample.java    |  9 ++-
 .../pig/sampling/WeightedReservoirSample.java   |  8 ++
 .../datafu/test/pig/sampling/SamplingTests.java | 81 ++++++++++++++++++++
 .../WeightedReservoirSamplingTests.java         | 80 +++++++++++++++++++
 4 files changed, 176 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/src/java/datafu/pig/sampling/ReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/ReservoirSample.java b/src/java/datafu/pig/sampling/ReservoirSample.java
index 747b28c..48deaad 100644
--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -97,8 +97,7 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
   @Override
   public void cleanup()
   {
-    this.reservoir = null;
-    this.scoreGen = null;
+    this.reservoir.clear();
   }
 
   @Override
@@ -218,6 +217,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
           output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
         }
       } else {     
+        getReservoir().clear();
+        
         for (Tuple sample : samples) {
           getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
@@ -256,6 +257,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
 
     @Override
     public Tuple exec(Tuple input) throws IOException {
+      getReservoir().clear();
+      
       DataBag bagOfSamples = (DataBag) input.get(0);
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        
@@ -300,6 +303,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     
     @Override
     public DataBag exec(Tuple input) throws IOException {
+      getReservoir().clear();
+      
       DataBag bagOfSamples = (DataBag) input.get(0);
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/src/java/datafu/pig/sampling/WeightedReservoirSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/WeightedReservoirSample.java b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
index 0792411..92af6a3 100644
--- a/src/java/datafu/pig/sampling/WeightedReservoirSample.java
+++ b/src/java/datafu/pig/sampling/WeightedReservoirSample.java
@@ -252,6 +252,14 @@ public class WeightedReservoirSample extends ReservoirSample {
         @Override
         public double generateScore(Tuple sample) throws ExecException
         {
+            if(this.weightIdx >= sample.size())
+            {
+                throw new ExecException(String.format("Weight index %d is outside tuple bounds", this.weightIdx));
+            }
+            if (sample.get(this.weightIdx) == null)
+            { 
+                throw new ExecException(String.format("null value for weight at index %d",this.weightIdx));
+            }
             double weight = ((Number)sample.get(this.weightIdx)).doubleValue();
             if(Double.compare(weight, 0.0) <= 0)
             {

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/test/pig/datafu/test/pig/sampling/SamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SamplingTests.java b/test/pig/datafu/test/pig/sampling/SamplingTests.java
index 0dbbda0..9209133 100644
--- a/test/pig/datafu/test/pig/sampling/SamplingTests.java
+++ b/test/pig/datafu/test/pig/sampling/SamplingTests.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -341,6 +342,86 @@ public class SamplingTests extends PigTests
     }
   }
   
+  /**
+  register $JAR_PATH
+
+  DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('$RESERVOIR_SIZE');
+  DEFINE Assert datafu.pig.util.Assert();
+  
+  data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int);
+  sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C)) as sample_data;
+  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+  sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
+  STORE sampled INTO 'output';
+
+   */
+  @Multiline
+  private String reservoirSampleGroupTest;
+  
+  /**
+   * Verifies that ReservoirSample works when data grouped by a key.
+   * In particular it ensures that the reservoir is not reused across keys.
+   * 
+   * <p>
+   * This confirms the fix for DATAFU-11.
+   * </p>
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void reservoirSampleGroupTest() throws Exception
+  {    
+    // first value is the key.  last value matches the key so we can
+    // verify the register is reset for each key.  values should not
+    // bleed across to other keys.
+    writeLinesToFile("input",
+                     "1\tB1\t1",
+                     "1\tB1\t1",
+                     "1\tB3\t1",
+                     "1\tB4\t1",
+                     "2\tB1\t2",
+                     "2\tB2\t2",
+                     "3\tB1\t3",
+                     "3\tB1\t3",
+                     "3\tB3\t3",
+                     "4\tB1\t4",
+                     "4\tB2\t4",
+                     "4\tB3\t4",
+                     "4\tB4\t4",
+                     "5\tB1\t5",
+                     "6\tB2\t6",
+                     "6\tB2\t6",
+                     "6\tB3\t6",
+                     "7\tB1\t7",
+                     "7\tB2\t7",
+                     "7\tB3\t7",
+                     "8\tB1\t8",
+                     "8\tB2\t8",
+                     "9\tB3\t9",
+                     "9\tB3\t9",
+                     "9\tB6\t9",
+                     "9\tB5\t9",
+                     "10\tB1\t10",
+                     "10\tB2\t10",
+                     "10\tB2\t10",
+                     "10\tB2\t10",
+                     "10\tB6\t10",
+                     "10\tB7\t10");
+   
+    for(int i=1; i<=3; i++) {
+      int reservoirSize = i ;
+      PigTest test = createPigTestFromString(reservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
+      test.runScript();
+      
+      List<Tuple> tuples = getLinesForAlias(test, "sampled");
+      
+      for (Tuple tuple : tuples)
+      {
+        Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
+      }
+    }
+  }
+  
   @Test
   public void reservoirSampleExecTest() throws IOException
   {

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/4414e8ca/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
index f507cbd..9bab8a0 100644
--- a/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
+++ b/test/pig/datafu/test/pig/sampling/WeightedReservoirSamplingTests.java
@@ -44,6 +44,86 @@ import datafu.test.pig.PigTests;
  */
 public class WeightedReservoirSamplingTests extends PigTests
 {
+  /**
+  register $JAR_PATH
+
+  DEFINE ReservoirSample datafu.pig.sampling.WeightedReservoirSample('$RESERVOIR_SIZE','2');
+  DEFINE Assert datafu.pig.util.Assert();
+  
+  data = LOAD 'input' AS (A_id:int, B_id:chararray, C:int, W:double);
+  sampled = FOREACH (GROUP data BY A_id) GENERATE group as A_id, ReservoirSample(data.(B_id,C,W)) as sample_data;
+  sampled = FILTER sampled BY Assert((SIZE(sample_data) <= $RESERVOIR_SIZE ? 1 : 0), 'must be <= $RESERVOIR_SIZE');
+  sampled = FOREACH sampled GENERATE A_id, FLATTEN(sample_data);
+  STORE sampled INTO 'output';
+
+   */
+  @Multiline
+  private String weightedReservoirSampleGroupTest;
+  
+  /**
+   * Verifies that WeightedReservoirSample works when data grouped by a key.
+   * In particular it ensures that the reservoir is not reused across keys.
+   * 
+   * <p>
+   * This confirms the fix for DATAFU-11.
+   * </p>
+   * 
+   * @throws Exception
+   */
+  @Test
+  public void weightedReservoirSampleGroupTest() throws Exception
+  {    
+    // first value is the key.  second to last value matches the key so we can
+    // verify the register is reset for each key.  values should not
+    // bleed across to other keys.
+    writeLinesToFile("input",
+                     "1\tB1\t1\t1.0",
+                     "1\tB1\t1\t1.0",
+                     "1\tB3\t1\t1.0",
+                     "1\tB4\t1\t1.0",
+                     "2\tB1\t2\t1.0",
+                     "2\tB2\t2\t1.0",
+                     "3\tB1\t3\t1.0",
+                     "3\tB1\t3\t1.0",
+                     "3\tB3\t3\t1.0",
+                     "4\tB1\t4\t1.0",
+                     "4\tB2\t4\t1.0",
+                     "4\tB3\t4\t1.0",
+                     "4\tB4\t4\t1.0",
+                     "5\tB1\t5\t1.0",
+                     "6\tB2\t6\t1.0",
+                     "6\tB2\t6\t1.0",
+                     "6\tB3\t6\t1.0",
+                     "7\tB1\t7\t1.0",
+                     "7\tB2\t7\t1.0",
+                     "7\tB3\t7\t1.0",
+                     "8\tB1\t8\t1.0",
+                     "8\tB2\t8\t1.0",
+                     "9\tB3\t9\t1.0",
+                     "9\tB3\t9\t1.0",
+                     "9\tB6\t9\t1.0",
+                     "9\tB5\t9\t1.0",
+                     "10\tB1\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB2\t10\t1.0",
+                     "10\tB6\t10\t1.0",
+                     "10\tB7\t10\t1.0");
+   
+    for(int i=1; i<=3; i++) {
+      int reservoirSize = i ;
+      PigTest test = createPigTestFromString(weightedReservoirSampleGroupTest, "RESERVOIR_SIZE="+reservoirSize);
+      test.runScript();
+      
+      List<Tuple> tuples = getLinesForAlias(test, "sampled");
+      
+      for (Tuple tuple : tuples)
+      {
+        Assert.assertEquals(((Number)tuple.get(0)).intValue(), ((Number)tuple.get(2)).intValue());
+      }
+    }
+  }
+  
  /** 
   register $JAR_PATH