You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2014/01/28 02:39:14 UTC

[1/2] git commit: DATAFU-5 update SimpleRandomSample to be consistent with SimpleRandomSampleWithReplacement

Updated Branches:
  refs/heads/master 424e3b485 -> 41a0c2ccf


DATAFU-5 update SimpleRandomSample to be consistent with SimpleRandomSampleWithReplacement

https://issues.apache.org/jira/browse/DATAFU-5

Signed-off-by: Matt Hayes <mh...@linkedin.com>


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/a4b17da9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/a4b17da9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/a4b17da9

Branch: refs/heads/master
Commit: a4b17da901968a4bb704e4d04aa9a6e53baddf29
Parents: 38a670e
Author: Xiangrui Meng <me...@databricks.com>
Authored: Mon Jan 27 10:48:19 2014 -0800
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Mon Jan 27 10:48:31 2014 -0800

----------------------------------------------------------------------
 .../datafu/pig/sampling/SimpleRandomSample.java | 386 +++++++++++++------
 .../pig/sampling/SimpleRandomSampleTest.java    |  74 +++-
 .../pig/sampling/SimpleRandomSampleTestOld.java | 176 +++++++++
 3 files changed, 508 insertions(+), 128 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/src/java/datafu/pig/sampling/SimpleRandomSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/SimpleRandomSample.java b/src/java/datafu/pig/sampling/SimpleRandomSample.java
index aff088a..8e8debf 100644
--- a/src/java/datafu/pig/sampling/SimpleRandomSample.java
+++ b/src/java/datafu/pig/sampling/SimpleRandomSample.java
@@ -34,7 +34,7 @@ import org.apache.pig.impl.logicalLayer.FrontendException;
 import org.apache.pig.impl.logicalLayer.schema.Schema;
 
 /**
- * Scalable simple random sampling.
+ * Scalable simple random sampling (ScaSRS).
  * <p/>
  * This UDF implements a scalable simple random sampling algorithm described in
  * 
@@ -42,45 +42,78 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
  * X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
  * </pre>
  * 
- * It takes a sampling probability p as input and outputs a simple random sample of size
- * exactly ceil(p*n) with probability at least 99.99%, where $n$ is the size of the
- * population. This UDF is very useful for stratified sampling. For example,
+ * It takes a bag of n items and a sampling probability p as the inputs, and outputs a
+ * simple random sample of size exactly ceil(p*n) in a bag, with probability at least
+ * 99.99%. For example, the following script generates a simple random sample with
+ * sampling probability 0.1:
  * 
  * <pre>
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
- * examples = LOAD ...
- * grouped = GROUP examples BY label;
- * sampled = FOREACH grouped GENERATE FLATTEN(SRS(examples));
- * STORE sampled ...
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double); 
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01));
+ * </pre>
+ * 
+ * Optionally, user can provide a good lower bound of n as the third argument to help
+ * reduce the size of intermediate data, for example:
+ * 
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double); 
+ * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01, summary.count));
  * </pre>
  * 
- * We note that, in a Java Hadoop job, we can output pre-selected records directly using
+ * This UDF is very useful for stratified sampling. For example, the following script
+ * keeps all positive examples while downsampling negatives with probability 0.1:
+ * 
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ * 
+ * item    = LOAD 'input' AS (x:double, label:int);
+ * grouped = FOREACH (GROUP item BY label) GENERATE item, (group == 1 ? 1.0 : 0.1) AS p; 
+ * sampled = FOREACH grouped GENERATE FLATTEN(SRS(item, p));
+ * </pre>
+ * 
+ * In a Java Hadoop MapReduce job, we can output selected items directly using
  * MultipleOutputs. However, this feature is not available in a Pig UDF. So we still let
- * pre-selected records go through the sort phase. However, as long as the sample size is
- * not huge, this should not be a big problem.
+ * selected items go through the sort phase. However, as long as the sample size is not
+ * huge, this should not be a big problem.
+ * 
+ * In the first version, the sampling probability is specified in the constructor. This 
+ * method is deprecated now and will be removed in the next release.
  * 
  * @author ximeng
  * 
  */
 public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
 {
-  private static final TupleFactory tupleFactory = TupleFactory.getInstance();
-  private static final BagFactory bagFactory = BagFactory.getInstance();
+  /**
+   * Prefix for the output bag name.
+   */
+  public static final String OUTPUT_BAG_NAME_PREFIX = "SRS";
+
+  private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
+  private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
 
   public SimpleRandomSample()
   {
+    // empty
   }
 
+  /**
+   * Constructs this UDF with a sampling probability.
+   * 
+   * @deprecated Should specify the sampling probability in the function call.
+   */
+  @Deprecated
   public SimpleRandomSample(String samplingProbability)
   {
-    Double p = Double.parseDouble(samplingProbability);
-
-    if (p < 0.0 || p > 1.0)
-    {
-      throw new IllegalArgumentException("Sampling probability must be inside [0, 1].");
-    }
+	double p = Double.parseDouble(samplingProbability);
+	verifySamplingProbability(p);
   }
-
+  
   @Override
   public String getInitial()
   {
@@ -111,68 +144,146 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
         throw new RuntimeException("Expected a BAG as input");
       }
 
-      return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
-                                                                 .getName()
-                                                                 .toLowerCase(), input),
+      return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+                                                                   input),
                                                inputFieldSchema.schema,
                                                DataType.BAG));
     }
     catch (FrontendException e)
     {
-      e.printStackTrace();
       throw new RuntimeException(e);
     }
   }
 
   static public class Initial extends EvalFunc<Tuple>
   {
-    private double _samplingProbability;
-    private RandomDataImpl _rdg = new RandomDataImpl();
+    // Should avoid creating many random number generator instances.
+    private static RandomDataImpl _RNG = new RandomDataImpl();
+
+    synchronized private static double nextDouble()
+    {
+      return _RNG.nextUniform(0.0d, 1.0d);
+    }
 
     public Initial()
     {
+      // empty
     }
 
+    @Deprecated
     public Initial(String samplingProbability)
     {
-      _samplingProbability = Double.parseDouble(samplingProbability);
+      _p = Double.parseDouble(samplingProbability);
     }
+    
+    private boolean _first = true;
+    private double _p = -1.0d; // the sampling probability
+    private long _n1 = 0L; // the input lower bound of the size of the population
+    private long _localCount = 0L; // number of items processed by this instance
 
     @Override
     public Tuple exec(Tuple input) throws IOException
     {
-      Tuple output = tupleFactory.newTuple();
-      DataBag selected = bagFactory.newDefaultBag();
-      DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
+      int numArgs = input.size();
+      
+      // The first if clause is for backward compatibility, which should be removed 
+      // after we remove specifying sampling probability in the constructor.
+      if(numArgs == 1)
+      {
+        if(_p < 0.0d)
+        {
+          throw new IllegalArgumentException("Sampling probability is not given.");
+        }
+      }
+      else if (numArgs < 2 || numArgs > 3)
+      {
+        throw new IllegalArgumentException("The input tuple should have either two or three fields: "
+            + "a bag of items, the sampling probability, "
+            + "and optionally a good lower bound of the size of the population or the exact number.");
+      }
 
       DataBag items = (DataBag) input.get(0);
-
-      if (items != null)
+      long numItems = items.size();
+      _localCount += numItems;
+
+      // This is also for backward compatibility. Should change to
+      // double p = ((Number) input.get(1)).doubleValue();
+      // after we remove specifying sampling probability in the constructor.
+      double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
+      if (_first)
+      {
+        _p = p;
+        verifySamplingProbability(p);
+      }
+      else
       {
-        long n = items.size();
+        if (p != _p)
+        {
+          throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
+              + _p + " and " + p + ".");
+        }
+      }
 
-        double q1 = getQ1(n, _samplingProbability);
-        double q2 = getQ2(n, _samplingProbability);
+      long n1 = 0L;
+      if (numArgs > 2)
+      {
+        n1 = ((Number) input.get(2)).longValue();
 
-        for (Tuple item : items)
+        if (_first)
+        {
+          _n1 = n1;
+        }
+        else
         {
-          double key = _rdg.nextUniform(0.0d, 1.0d);
+          if (n1 != _n1)
+          {
+            throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
+                + _n1 + " and " + n1 + ".");
+          }
+        }
+      }
+
+      _first = false;
+
+      // Use the local count if the input lower bound is smaller.
+      n1 = Math.max(n1, _localCount);
+
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag waiting = _BAG_FACTORY.newDefaultBag();
 
-          if (key < q1)
+      if (n1 > 0L)
+      {
+        double q1 = getQ1(n1, p);
+        double q2 = getQ2(n1, p);
+
+        for (Tuple t : items)
+        {
+          double x = nextDouble();
+          if (x < q1)
           {
-            selected.add(item);
+            selected.add(t);
           }
-          else if (key < q2)
+          else if (x < q2)
           {
-            waiting.add(new ScoredTuple(key, item).getIntermediateTuple(tupleFactory));
+            waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
           }
         }
-
-        output.append(n);
-        output.append(selected);
-        output.append(waiting);
       }
 
+      /*
+       * The output tuple contains the following fields: sampling probability (double),
+       * number of processed items in this tuple (long), a good lower bound of the size of
+       * the population or the exact number (long), a bag of selected items (bag), and a
+       * bag of waitlisted items with scores (bag).
+       */
+      Tuple output = _TUPLE_FACTORY.newTuple();
+
+      output.append(p);
+      output.append(numItems);
+      output.append(n1);
+      output.append(selected);
+      output.append(waiting);
+
       return output;
     }
   }
@@ -181,36 +292,51 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
   {
     public Intermediate()
     {
+      // empty
     }
-
+    
+    @Deprecated
     public Intermediate(String samplingProbability)
     {
-      _samplingProbability = Double.parseDouble(samplingProbability);
+      // empty
     }
 
-    private double _samplingProbability;
-
     @Override
     public Tuple exec(Tuple input) throws IOException
     {
       DataBag bag = (DataBag) input.get(0);
-      DataBag selected = bagFactory.newDefaultBag();
-      DataBag aggWaiting = bagFactory.newSortedBag(new ScoredTupleComparator());
-      DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
-      Tuple output = tupleFactory.newTuple();
 
-      long n = 0L;
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();
 
-      for (Tuple innerTuple : bag)
+      boolean first = true;
+      double p = 0.0d;
+      long numItems = 0L; // number of items processed, including rejected
+      long n1 = 0L;
+
+      for (Tuple tuple : bag)
       {
-        n += (Long) innerTuple.get(0);
+        if (first)
+        {
+          p = (Double) tuple.get(0);
+          first = false;
+        }
+
+        numItems += (Long) tuple.get(1);
+        n1 = Math.max((Long) tuple.get(2), numItems);
+
+        selected.addAll((DataBag) tuple.get(3));
+        aggWaiting.addAll((DataBag) tuple.get(4));
+      }
 
-        selected.addAll((DataBag) innerTuple.get(1));
+      DataBag waiting = _BAG_FACTORY.newDefaultBag();
 
-        double q1 = getQ1(n, _samplingProbability);
-        double q2 = getQ2(n, _samplingProbability);
+      if (n1 > 0L)
+      {
+        double q1 = getQ1(n1, p);
+        double q2 = getQ2(n1, p);
 
-        for (Tuple t : (DataBag) innerTuple.get(2))
+        for (Tuple t : aggWaiting)
         {
           ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
 
@@ -220,94 +346,129 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
           }
           else if (scored.getScore() < q2)
           {
-            aggWaiting.add(t);
-          }
-          else
-          {
-            break;
+            waiting.add(t);
           }
         }
       }
 
-      double q1 = getQ1(n, _samplingProbability);
-      double q2 = getQ2(n, _samplingProbability);
+      Tuple output = _TUPLE_FACTORY.newTuple();
 
-      for (Tuple t : aggWaiting)
-      {
-        ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
-
-        if (scored.getScore() < q1)
-        {
-          selected.add(scored.getTuple());
-        }
-        else if (scored.getScore() < q2)
-        {
-          waiting.add(t);
-        }
-        else
-        {
-          break;
-        }
-      }
-
-      output.append(n);
+      output.append(p);
+      output.append(numItems);
+      output.append(n1);
       output.append(selected);
       output.append(waiting);
 
-      System.err.println("Read " + n + " items, selected " + selected.size()
-          + ", and wait-listed " + aggWaiting.size() + ".");
-
       return output;
     }
   }
 
   static public class Final extends EvalFunc<DataBag>
   {
-    private double _samplingProbability;
-
     public Final()
     {
+      // empty
     }
 
+    @Deprecated
     public Final(String samplingProbability)
     {
-      _samplingProbability = Double.parseDouble(samplingProbability);
+      // empty
     }
-
+    
     @Override
     public DataBag exec(Tuple input) throws IOException
     {
       DataBag bag = (DataBag) input.get(0);
-      long n = 0L;
-      DataBag selected = bagFactory.newDefaultBag();
-      DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
 
-      for (Tuple innerTuple : bag)
+      boolean first = true;
+      double p = 0.0d; // the sampling probability
+      long n = 0L; // the size of the population (total number of items)
+
+      DataBag selected = _BAG_FACTORY.newDefaultBag();
+      DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());
+
+      for (Tuple tuple : bag)
       {
-        n += (Long) innerTuple.get(0);
-        selected.addAll((DataBag) innerTuple.get(1));
-        waiting.addAll((DataBag) innerTuple.get(2));
+        if (first)
+        {
+          p = (Double) tuple.get(0);
+          first = false;
+        }
+
+        n += (Long) tuple.get(1);
+        selected.addAll((DataBag) tuple.get(3));
+        waiting.addAll((DataBag) tuple.get(4));
       }
 
-      long sampleSize = (long) Math.ceil(_samplingProbability * n);
-      long nNeeded = sampleSize - selected.size();
+      long numSelected = selected.size();
+      long numWaiting = waiting.size();
+
+      long s = (long) Math.ceil(p * n); // sample size
+
+      System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
+          + numSelected + ", and waitlisted " + waiting.size() + ".");
+
+      long numNeeded = s - selected.size();
+
+      if (numNeeded < 0)
+      {
+        System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
+            + ".");
+      }
 
       for (Tuple scored : waiting)
       {
-        if (nNeeded <= 0)
+        if (numNeeded <= 0)
         {
           break;
         }
         selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
-        nNeeded--;
+        numNeeded--;
+      }
+
+      if (numNeeded > 0)
+      {
+        System.err.println("The waiting list only has " + numWaiting
+            + " items, but needed " + numNeeded + " more.");
       }
 
       return selected;
     }
   }
 
-  private static class ScoredTupleComparator implements Comparator<Tuple>
+  // computes a threshold to select items
+  private static double getQ1(long n, double p)
+  {
+    double t1 = 20.0d / (3.0d * n);
+    double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0d * t1 * p);
+    return q1;
+  }
+
+  // computes a threshold to reject items
+  private static double getQ2(long n, double p)
+  {
+    double t2 = 10.0d / n;
+    double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0d * t2 * p);
+    return q2;
+  }
+  
+  private static void verifySamplingProbability(double p)
+  {
+	if(p < 0.0 || p > 1.0) 
+	{
+	  throw new IllegalArgumentException("Sampling probabiilty must be inside [0, 1].");
+	}
+  }
+
+  static class ScoredTupleComparator implements Comparator<Tuple>
   {
+    public static final ScoredTupleComparator getInstance()
+    {
+      return _instance;
+    }
+
+    private static final ScoredTupleComparator _instance = new ScoredTupleComparator();
 
     @Override
     public int compare(Tuple o1, Tuple o2)
@@ -325,17 +486,4 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
     }
   }
 
-  private static double getQ1(long n, double p)
-  {
-    double t1 = 20.0 / (3.0 * n);
-    double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0 * t1 * p);
-    return q1;
-  }
-
-  private static double getQ2(long n, double p)
-  {
-    double t2 = 10.0 / n;
-    double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0 * t2 * p);
-    return q2;
-  }
 }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
index 7a0ced2..bd34881 100644
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
+++ b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
@@ -37,11 +37,11 @@ public class SimpleRandomSampleTest extends PigTests
   /**
    * register $JAR_PATH
    * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
    * 
    * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
    * 
-   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
+   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p) as sample_data;
    * 
    * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
    * 
@@ -50,8 +50,42 @@ public class SimpleRandomSampleTest extends PigTests
   @Multiline
   private String simpleRandomSampleTest;
 
+  /**
+   * register $JAR_PATH
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p, $n1) as sample_data;
+   * 
+   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String simpleRandomSampleWithLowerBoundTest;
+
+  /**
+   * register $JAR_PATH
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p1) as sample_1, SRS(data,
+   * $p2) AS sample_2;
+   * 
+   * sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
+   * AS sample_count_2;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String simpleRandomSampleWithTwoCallsTest;
+
   @Test
-  public void simpleRandomSampleTest() throws Exception
+  public void testSimpleRandomSample() throws Exception
   {
     writeLinesToFile("input",
                      "A1\tB1\t1",
@@ -90,22 +124,42 @@ public class SimpleRandomSampleTest extends PigTests
     int n = 32;
     double p = 0.3;
     int s = (int) Math.ceil(p * n);
-    PigTest test =
-        createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
 
+    PigTest test = createPigTestFromString(simpleRandomSampleTest, "p=" + p);
     test.runScript();
-
     assertOutput(test, "sampled", "(" + s + ")");
+
+    int n1 = 30;
+    PigTest testWithLB =
+        createPigTestFromString(simpleRandomSampleWithLowerBoundTest, "p=" + p, "n1="
+            + n1);
+    testWithLB.runScript();
+    assertOutput(testWithLB, "sampled", "(" + s + ")");
+
+    double p1 = 0.05;
+    double p2 = 0.95;
+    int s1 = (int) Math.ceil(p1 * n);
+    int s2 = (int) Math.ceil(p2 * n);
+
+    PigTest testWithTwoCalls =
+        createPigTestFromString(simpleRandomSampleWithTwoCallsTest, "p1=" + p1, "p2="
+            + p2);
+    testWithTwoCalls.runScript();
+    assertOutput(testWithTwoCalls, "sampled", "(" + s1 + "," + s2 + ")");
+
+    test.runScript();
+
   }
 
   /**
    * register $JAR_PATH
    * 
-   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
    * 
    * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
    * 
-   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
+   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data,
+   * $SAMPLING_PROBABILITY) as sample_data;
    * 
    * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
    * 
@@ -117,7 +171,7 @@ public class SimpleRandomSampleTest extends PigTests
   private String stratifiedSampleTest;
 
   @Test
-  public void stratifiedSampleTest() throws Exception
+  public void testStratifiedSample() throws Exception
   {
     writeLinesToFile("input",
                      "A1\tB1\t1",
@@ -171,4 +225,6 @@ public class SimpleRandomSampleTest extends PigTests
                  "(A8,1)",
                  "(A9,2)");
   }
+  
+  
 }

http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
new file mode 100644
index 0000000..15e1fd6
--- /dev/null
+++ b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.SimpleRandomSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link SimpleRandomSample}.
+ * 
+ * @deprecated This tests the deprecated functionality of SimpleRandomSample
+ *             where the probability can be specified in the constructor.  
+ * @author ximeng
+ * 
+ */
+public class SimpleRandomSampleTestOld extends PigTests
+{
+  /**
+   * register $JAR_PATH
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
+   * 
+   * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String simpleRandomSampleTest;
+
+  @Test
+  public void simpleRandomSampleTest() throws Exception
+  {
+    writeLinesToFile("input",
+                     "A1\tB1\t1",
+                     "A1\tB1\t4",
+                     "A1\tB3\t4",
+                     "A1\tB4\t4",
+                     "A2\tB1\t4",
+                     "A2\tB2\t4",
+                     "A3\tB1\t3",
+                     "A3\tB1\t1",
+                     "A3\tB3\t77",
+                     "A4\tB1\t3",
+                     "A4\tB2\t3",
+                     "A4\tB3\t59",
+                     "A4\tB4\t29",
+                     "A5\tB1\t4",
+                     "A6\tB2\t3",
+                     "A6\tB2\t55",
+                     "A6\tB3\t1",
+                     "A7\tB1\t39",
+                     "A7\tB2\t27",
+                     "A7\tB3\t85",
+                     "A8\tB1\t4",
+                     "A8\tB2\t45",
+                     "A9\tB3\t92",
+                     "A9\tB3\t0",
+                     "A9\tB6\t42",
+                     "A9\tB5\t1",
+                     "A10\tB1\t7",
+                     "A10\tB2\t23",
+                     "A10\tB2\t1",
+                     "A10\tB2\t31",
+                     "A10\tB6\t41",
+                     "A10\tB7\t52");
+
+    int n = 32;
+    double p = 0.3;
+    int s = (int) Math.ceil(p * n);
+    PigTest test =
+        createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
+
+    test.runScript();
+
+    assertOutput(test, "sampled", "(" + s + ")");
+  }
+
+  /**
+   * register $JAR_PATH
+   * 
+   * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+   * 
+   * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+   * 
+   * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
+   * 
+   * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
+   * 
+   * sampled = ORDER sampled BY group;
+   * 
+   * STORE sampled INTO 'output';
+   */
+  @Multiline
+  private String stratifiedSampleTest;
+
+  @Test
+  public void stratifiedSampleTest() throws Exception
+  {
+    writeLinesToFile("input",
+                     "A1\tB1\t1",
+                     "A1\tB1\t4",
+                     "A1\tB3\t4",
+                     "A1\tB4\t4",
+                     "A2\tB1\t4",
+                     "A2\tB2\t4",
+                     "A3\tB1\t3",
+                     "A3\tB1\t1",
+                     "A3\tB3\t77",
+                     "A4\tB1\t3",
+                     "A4\tB2\t3",
+                     "A4\tB3\t59",
+                     "A4\tB4\t29",
+                     "A5\tB1\t4",
+                     "A6\tB2\t3",
+                     "A6\tB2\t55",
+                     "A6\tB3\t1",
+                     "A7\tB1\t39",
+                     "A7\tB2\t27",
+                     "A7\tB3\t85",
+                     "A8\tB1\t4",
+                     "A8\tB2\t45",
+                     "A9\tB3\t92",
+                     "A9\tB3\t0",
+                     "A9\tB6\t42",
+                     "A9\tB5\t1",
+                     "A10\tB1\t7",
+                     "A10\tB2\t23",
+                     "A10\tB2\t1",
+                     "A10\tB2\t31",
+                     "A10\tB6\t41",
+                     "A10\tB7\t52");
+
+    double p = 0.5;
+
+    PigTest test =
+        createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
+    test.runScript();
+    assertOutput(test,
+                 "sampled",
+                 "(A1,2)",
+                 "(A10,3)",
+                 "(A2,1)",
+                 "(A3,2)",
+                 "(A4,2)",
+                 "(A5,1)",
+                 "(A6,2)",
+                 "(A7,2)",
+                 "(A8,1)",
+                 "(A9,2)");
+  }
+}


[2/2] git commit: Merge with master

Posted by mh...@apache.org.
Merge with master


Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/41a0c2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/41a0c2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/41a0c2cc

Branch: refs/heads/master
Commit: 41a0c2ccfc5e980b715d5d11d54eb673462de1a7
Parents: 424e3b4 a4b17da
Author: Matt Hayes <mh...@linkedin.com>
Authored: Mon Jan 27 17:37:05 2014 -0800
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Mon Jan 27 17:38:22 2014 -0800

----------------------------------------------------------------------
 .../datafu/pig/sampling/SimpleRandomSample.java | 386 +++++++++++++------
 .../pig/sampling/SimpleRandomSampleTest.java    |  74 +++-
 .../pig/sampling/SimpleRandomSampleTestOld.java | 176 +++++++++
 3 files changed, 508 insertions(+), 128 deletions(-)
----------------------------------------------------------------------