You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@datafu.apache.org by "jian wang (JIRA)" <ji...@apache.org> on 2014/01/19 15:22:19 UTC

[jira] [Comment Edited] (DATAFU-11) ReservoirSample does not behave as expected when grouping by a key other than ALL

    [ https://issues.apache.org/jira/browse/DATAFU-11?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13875677#comment-13875677 ] 

jian wang edited comment on DATAFU-11 at 1/19/14 2:20 PM:
----------------------------------------------------------

Share some of my investigation. 

Add some debug output in the Initial, Intermed and Final, find that the same Intermed and Final instance's exec() is called multiple times to process grouped output from mapper under different grouped keys.  So the reservoir variable is shared between different group output. 

Also the same to Initial, find that the same Initial instance's exec() is called to process mapper input, even if it is of different group key.

Tried Barbara's solution, it resolves the reservoir variable share problem of Intermed and Final instance. But I am not sure if we need to distinguish the Initial input contains samples under different keys. eg: if <a1, 1>, <a2, 1> appears in the input sample argument of one invocation of Initial's exec().

Test diff that adds debug output to ReservoirSample:

--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -176,10 +176,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     protected ScoredTuple.ScoreGenerator scoreGen;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Initial(){}
     
     public Initial(String numSamples)
     {
+     id =System.nanoTime();
+     System.out.println("Create initial instance, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -206,7 +210,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
       
       ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
       
+      
       DataBag samples = (DataBag) input.get(0);
+      
+      int preConsiderReservoirSize = getReservoir().size(); 
+      
       if (samples == null)
       {
         // do nothing
@@ -217,7 +225,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
           // add the score on to the intermediate tuple
           output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
         }
      } else {            
         for (Tuple sample : samples) {
           getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
@@ -227,7 +236,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
           output.add(scoredTuple.getIntermediateTuple(tupleFactory));
         }
       }
-
+      System.out.println("finish exec of Initial with id: " + id + 
+              ", input: " + samples + 
+              ", output: " + tupleFactory.newTuple(output) +
+              ", preConsider reservoir size:" + preConsiderReservoirSize +
+              ", output reservoir size: " + getReservoir().size());
       return tupleFactory.newTuple(output);
     }
     
@@ -239,10 +252,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Intermediate(){}
     
     public Intermediate(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create intermed, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -257,21 +274,30 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public Tuple exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      //Reservoir r = new Reservoir(this.numSamples); //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = getReservoir().size();
+      
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        
         
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+         r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();
-      for (ScoredTuple scoredTuple : getReservoir()) {
+     for (ScoredTuple scoredTuple : r) {
         // add the score on to the intermediate tuple
         output.add(scoredTuple.getIntermediateTuple(tupleFactory));
       }
-
+      System.out.println("enter exec of Intermed with id: " + id +
+              ", input: " + bagOfSamples + 
+              ", output: " + tupleFactory.newTuple(output) + 
+              ", pre consider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
       return tupleFactory.newTuple(output);
     }
     
@@ -283,10 +309,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Final(){}
     
     public Final(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create final, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -301,20 +331,32 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public DataBag exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      
+      //Reservoir r = new Reservoir(this.numSamples);  //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = r.size();
+      
       for (Tuple innerTuple : bagOfSamples) {
-        DataBag samples = (DataBag) innerTuple.get(0);        
-        
+        DataBag samples = (DataBag) innerTuple.get(0);   
+                
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+          r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();  
-      for (ScoredTuple scoredTuple : getReservoir()) {
+      for (ScoredTuple scoredTuple : r) {
         // output the original tuple
         output.add(scoredTuple.getTuple());
       }
+      
+      System.out.println("enter exec of Final with id: " + id +
+              ", input: " + bagOfSamples +
+              ", output: " + output + 
+              ", preConsider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
 
       return output;
     }    

Test pig:

register datafu-1.2.1-SNAPSHOT.jar;

DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: chararray);
grouped = GROUP data BY key;
sample2 = FOREACH grouped GENERATE ReservoirSample(data);
dump sample2;

all_grouped = GROUP data ALL;
sample3 = FOREACH all_grouped GENERATE ReservoirSample(data);
dump sample3;

The debug output:

Create initial instance, numSamples: 2, id: 1390057931816676000
Create intermed, numSamples: 2, id: 1390057931821523000
Create final, numSamples: 2, id: 1390057931822896000

Create initial instance, numSamples: 2, id: 1390057932290560000
finish exec of Initial with id: 1390057932290560000, input: {(a1,5)}, output: ({(0.881896844845867,(a1,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,6)}, output: ({(0.7843315204002389,(a1,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,7)}, output: ({(0.8355896762512851,(a1,7))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,5)}, output: ({(0.34698641636909844,(a2,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,6)}, output: ({(0.8827556451749948,(a2,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,7)}, output: ({(0.5957191900640255,(a2,7))}), preConsider reservoir size:0, output reservoir size: 0

Create intermed, numSamples: 2, id: 1390057932320244000
enter exec of Intermed with id: 1390057932320244000, input: {({(0.881896844845867,(a1,5))}),({(0.7843315204002389,(a1,6))}),({(0.8355896762512851,(a1,7))})}, output: ({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))}), pre consider reservoir size: 0, output reservoir size: 2
enter exec of Intermed with id: 1390057932320244000, input: {({(0.34698641636909844,(a2,5))}),({(0.8827556451749948,(a2,6))}),({(0.5957191900640255,(a2,7))})}, output: ({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))}), pre consider reservoir size: 2, output reservoir size: 2

Create final, numSamples: 2, id: 1390057935230757000
Create final, numSamples: 2, id: 1390057935235201000
enter exec of Final with id: 1390057935230757000, input: {({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))})}, output: {(a1,7),(a1,5)}, preConsider reservoir size: 0, output reservoir size: 2
enter exec of Final with id: 1390057935230757000, input: {({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))})}, output: {(a1,5),(a2,6)}, preConsider reservoir size: 2, output reservoir size: 2

({(a1,7),(a1,5)})
({(a1,5),(a2,6)})

Create initial instance, numSamples: 2, id: 1390057938566442000
Create intermed, numSamples: 2, id: 1390057938566921000
Create final, numSamples: 2, id: 1390057938567202000
Create initial instance, numSamples: 2, id: 1390057938763658000

finish exec of Initial with id: 1390057938763658000, input: {(a1,5)}, output: ({(0.3309669492690237,(a1,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,6)}, output: ({(0.9869182911655885,(a1,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,7)}, output: ({(0.02294962522144972,(a1,7))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,5)}, output: ({(0.8470840606471902,(a2,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,6)}, output: ({(0.00556133574224793,(a2,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,7)}, output: ({(0.8134381033586324,(a2,7))}), preConsider reservoir size:0, output reservoir size: 0

Create intermed, numSamples: 2, id: 1390057938777755000
enter exec of Intermed with id: 1390057938777755000, input: {({(0.3309669492690237,(a1,5))}),({(0.9869182911655885,(a1,6))}),({(0.02294962522144972,(a1,7))}),({(0.8470840606471902,(a2,5))}),({(0.00556133574224793,(a2,6))}),({(0.8134381033586324,(a2,7))})}, output: ({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))}), pre consider reservoir size: 0, output reservoir size: 2

Create final, numSamples: 2, id: 1390057941698739000
Create final, numSamples: 2, id: 1390057941701617000

enter exec of Final with id: 1390057941698739000, input: {({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))})}, output: {(a2,5),(a1,6)}, preConsider reservoir size: 0, output reservoir size: 2
({(a2,5),(a1,6)})
~                                               


was (Author: king821221):
Share some of my investigation. 

Add some debug output in the Initial, Intermed and Final, find that the same Intermed and Final instance's exec() is called multiple times to process grouped output from mapper under different grouped keys.  So the reservoir variable is shared between different group output. 

Also the same to Initial, find that the same Initial instance's exec() is called to process mapper input, even if it is of different group key.

Tried Barbara's solution, it resolves the reservoir variable share problem of Intermed and Final instance. But I am not sure if we need to distinguish the Initial input contains samples under different keys. eg: if <a1, 1>, <a2, 1> appears in the input sample of an Initial instance.

Test diff that adds debug output to ReservoirSample:

--- a/src/java/datafu/pig/sampling/ReservoirSample.java
+++ b/src/java/datafu/pig/sampling/ReservoirSample.java
@@ -176,10 +176,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     protected ScoredTuple.ScoreGenerator scoreGen;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Initial(){}
     
     public Initial(String numSamples)
     {
+     id =System.nanoTime();
+     System.out.println("Create initial instance, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -206,7 +210,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
       
       ScoredTuple.ScoreGenerator scoreGen = getScoreGenerator();
       
+      
       DataBag samples = (DataBag) input.get(0);
+      
+      int preConsiderReservoirSize = getReservoir().size(); 
+      
       if (samples == null)
       {
         // do nothing
@@ -217,7 +225,8 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
           // add the score on to the intermediate tuple
           output.add(new ScoredTuple(scoreGen.generateScore(sample), sample).getIntermediateTuple(tupleFactory));
         }
      } else {            
         for (Tuple sample : samples) {
           getReservoir().consider(new ScoredTuple(scoreGen.generateScore(sample), sample));
         }    
@@ -227,7 +236,11 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
           output.add(scoredTuple.getIntermediateTuple(tupleFactory));
         }
       }
-
+      System.out.println("finish exec of Initial with id: " + id + 
+              ", input: " + samples + 
+              ", output: " + tupleFactory.newTuple(output) +
+              ", preConsider reservoir size:" + preConsiderReservoirSize +
+              ", output reservoir size: " + getReservoir().size());
       return tupleFactory.newTuple(output);
     }
     
@@ -239,10 +252,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Intermediate(){}
     
     public Intermediate(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create intermed, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -257,21 +274,30 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public Tuple exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      //Reservoir r = new Reservoir(this.numSamples); //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = getReservoir().size();
+      
       for (Tuple innerTuple : bagOfSamples) {
         DataBag samples = (DataBag) innerTuple.get(0);        
         
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+         r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();
-      for (ScoredTuple scoredTuple : getReservoir()) {
+     for (ScoredTuple scoredTuple : r) {
         // add the score on to the intermediate tuple
         output.add(scoredTuple.getIntermediateTuple(tupleFactory));
       }
-
+      System.out.println("enter exec of Intermed with id: " + id +
+              ", input: " + bagOfSamples + 
+              ", output: " + tupleFactory.newTuple(output) + 
+              ", pre consider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
       return tupleFactory.newTuple(output);
     }
     
@@ -283,10 +309,14 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     private Reservoir reservoir;
     TupleFactory tupleFactory = TupleFactory.getInstance();
     
+    private long id;
+    
     public Final(){}
     
     public Final(String numSamples)
     {
+     id = System.nanoTime();
+     System.out.println("Create final, numSamples: " + numSamples + ", id: " + id);
       this.numSamples = Integer.parseInt(numSamples);
     }
     
@@ -301,20 +331,32 @@ public class ReservoirSample extends AccumulatorEvalFunc<DataBag> implements Alg
     @Override
     public DataBag exec(Tuple input) throws IOException {
       DataBag bagOfSamples = (DataBag) input.get(0);
+      
+      //Reservoir r = new Reservoir(this.numSamples);  //the working way
+      Reservoir r = getReservoir();
+      
+      int preConsiderReservoirSize = r.size();
+      
       for (Tuple innerTuple : bagOfSamples) {
-        DataBag samples = (DataBag) innerTuple.get(0);        
-        
+        DataBag samples = (DataBag) innerTuple.get(0);   
+                
         for (Tuple sample : samples) {
           // use the same score as previously generated
-          getReservoir().consider(ScoredTuple.fromIntermediateTuple(sample));
+          r.consider(ScoredTuple.fromIntermediateTuple(sample));
         }
       }
       
       DataBag output = BagFactory.getInstance().newDefaultBag();  
-      for (ScoredTuple scoredTuple : getReservoir()) {
+      for (ScoredTuple scoredTuple : r) {
         // output the original tuple
         output.add(scoredTuple.getTuple());
       }
+      
+      System.out.println("enter exec of Final with id: " + id +
+              ", input: " + bagOfSamples +
+              ", output: " + output + 
+              ", preConsider reservoir size: " + preConsiderReservoirSize +
+              ", output reservoir size: " + r.size());
 
       return output;
     }    

Test pig:

register datafu-1.2.1-SNAPSHOT.jar;

DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: chararray);
grouped = GROUP data BY key;
sample2 = FOREACH grouped GENERATE ReservoirSample(data);
dump sample2;

all_grouped = GROUP data ALL;
sample3 = FOREACH all_grouped GENERATE ReservoirSample(data);
dump sample3;

The debug output:

Create initial instance, numSamples: 2, id: 1390057931816676000
Create intermed, numSamples: 2, id: 1390057931821523000
Create final, numSamples: 2, id: 1390057931822896000

Create initial instance, numSamples: 2, id: 1390057932290560000
finish exec of Initial with id: 1390057932290560000, input: {(a1,5)}, output: ({(0.881896844845867,(a1,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,6)}, output: ({(0.7843315204002389,(a1,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a1,7)}, output: ({(0.8355896762512851,(a1,7))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,5)}, output: ({(0.34698641636909844,(a2,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,6)}, output: ({(0.8827556451749948,(a2,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057932290560000, input: {(a2,7)}, output: ({(0.5957191900640255,(a2,7))}), preConsider reservoir size:0, output reservoir size: 0

Create intermed, numSamples: 2, id: 1390057932320244000
enter exec of Intermed with id: 1390057932320244000, input: {({(0.881896844845867,(a1,5))}),({(0.7843315204002389,(a1,6))}),({(0.8355896762512851,(a1,7))})}, output: ({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))}), pre consider reservoir size: 0, output reservoir size: 2
enter exec of Intermed with id: 1390057932320244000, input: {({(0.34698641636909844,(a2,5))}),({(0.8827556451749948,(a2,6))}),({(0.5957191900640255,(a2,7))})}, output: ({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))}), pre consider reservoir size: 2, output reservoir size: 2

Create final, numSamples: 2, id: 1390057935230757000
Create final, numSamples: 2, id: 1390057935235201000
enter exec of Final with id: 1390057935230757000, input: {({(0.8355896762512851,(a1,7)),(0.881896844845867,(a1,5))})}, output: {(a1,7),(a1,5)}, preConsider reservoir size: 0, output reservoir size: 2
enter exec of Final with id: 1390057935230757000, input: {({(0.881896844845867,(a1,5)),(0.8827556451749948,(a2,6))})}, output: {(a1,5),(a2,6)}, preConsider reservoir size: 2, output reservoir size: 2

({(a1,7),(a1,5)})
({(a1,5),(a2,6)})

Create initial instance, numSamples: 2, id: 1390057938566442000
Create intermed, numSamples: 2, id: 1390057938566921000
Create final, numSamples: 2, id: 1390057938567202000
Create initial instance, numSamples: 2, id: 1390057938763658000

finish exec of Initial with id: 1390057938763658000, input: {(a1,5)}, output: ({(0.3309669492690237,(a1,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,6)}, output: ({(0.9869182911655885,(a1,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a1,7)}, output: ({(0.02294962522144972,(a1,7))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,5)}, output: ({(0.8470840606471902,(a2,5))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,6)}, output: ({(0.00556133574224793,(a2,6))}), preConsider reservoir size:0, output reservoir size: 0
finish exec of Initial with id: 1390057938763658000, input: {(a2,7)}, output: ({(0.8134381033586324,(a2,7))}), preConsider reservoir size:0, output reservoir size: 0

Create intermed, numSamples: 2, id: 1390057938777755000
enter exec of Intermed with id: 1390057938777755000, input: {({(0.3309669492690237,(a1,5))}),({(0.9869182911655885,(a1,6))}),({(0.02294962522144972,(a1,7))}),({(0.8470840606471902,(a2,5))}),({(0.00556133574224793,(a2,6))}),({(0.8134381033586324,(a2,7))})}, output: ({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))}), pre consider reservoir size: 0, output reservoir size: 2

Create final, numSamples: 2, id: 1390057941698739000
Create final, numSamples: 2, id: 1390057941701617000

enter exec of Final with id: 1390057941698739000, input: {({(0.8470840606471902,(a2,5)),(0.9869182911655885,(a1,6))})}, output: {(a2,5),(a1,6)}, preConsider reservoir size: 0, output reservoir size: 2
({(a2,5),(a1,6)})
~                                               

> ReservoirSample does not behave as expected when grouping by a key other than ALL
> ---------------------------------------------------------------------------------
>
>                 Key: DATAFU-11
>                 URL: https://issues.apache.org/jira/browse/DATAFU-11
>             Project: DataFu
>          Issue Type: Bug
>            Reporter: Will Vaughan
>
> Reported by Barbara Mucha ([Issue #92 on GitHub|https://github.com/linkedin/datafu/issues/92]):
> ReservoirSample does not behave as expected when grouping by a key other than ALL.
> It appears like the sample is done on the full input instead of the group input.
> Given input:
> {noformat}
> a1,5
> a1,6
> a1,7
> a2,5
> a2,6
> a2,7
> {noformat}
> with the following program
> {noformat}
> DEFINE ReservoirSample datafu.pig.sampling.ReservoirSample('2');
> data = LOAD 'input.txt' USING PigStorage(',') AS (key: chararray, value: chararray);
> grouped = GROUP data BY key;
> sample2 = FOREACH grouped GENERATE ReservoirSample(data);
> {noformat}
> the expected output should be similar to
> {noformat}
> (a1, {(a1,5),(a1,7)}
> (a2, {(a2,5),(a2,7)}
> {noformat}
> However, actual output may show up as
> {noformat}
> (a1, {(a1,5),(a1,7)}
> (a2, {(a1,5),(a1,7)}
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.1.5#6160)