You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@datafu.apache.org by mh...@apache.org on 2014/01/28 02:39:14 UTC
[1/2] git commit: DATAFU-5 update SimpleRandomSample to be consistent
with SimpleRandomSampleWithReplacement
Updated Branches:
refs/heads/master 424e3b485 -> 41a0c2ccf
DATAFU-5 update SimpleRandomSample to be consistent with SimpleRandomSampleWithReplacement
https://issues.apache.org/jira/browse/DATAFU-5
Signed-off-by: Matt Hayes <mh...@linkedin.com>
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/a4b17da9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/a4b17da9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/a4b17da9
Branch: refs/heads/master
Commit: a4b17da901968a4bb704e4d04aa9a6e53baddf29
Parents: 38a670e
Author: Xiangrui Meng <me...@databricks.com>
Authored: Mon Jan 27 10:48:19 2014 -0800
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Mon Jan 27 10:48:31 2014 -0800
----------------------------------------------------------------------
.../datafu/pig/sampling/SimpleRandomSample.java | 386 +++++++++++++------
.../pig/sampling/SimpleRandomSampleTest.java | 74 +++-
.../pig/sampling/SimpleRandomSampleTestOld.java | 176 +++++++++
3 files changed, 508 insertions(+), 128 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/src/java/datafu/pig/sampling/SimpleRandomSample.java
----------------------------------------------------------------------
diff --git a/src/java/datafu/pig/sampling/SimpleRandomSample.java b/src/java/datafu/pig/sampling/SimpleRandomSample.java
index aff088a..8e8debf 100644
--- a/src/java/datafu/pig/sampling/SimpleRandomSample.java
+++ b/src/java/datafu/pig/sampling/SimpleRandomSample.java
@@ -34,7 +34,7 @@ import org.apache.pig.impl.logicalLayer.FrontendException;
import org.apache.pig.impl.logicalLayer.schema.Schema;
/**
- * Scalable simple random sampling.
+ * Scalable simple random sampling (ScaSRS).
* <p/>
* This UDF implements a scalable simple random sampling algorithm described in
*
@@ -42,45 +42,78 @@ import org.apache.pig.impl.logicalLayer.schema.Schema;
* X. Meng, Scalable Simple Random Sampling and Stratified Sampling, ICML 2013.
* </pre>
*
- * It takes a sampling probability p as input and outputs a simple random sample of size
- * exactly ceil(p*n) with probability at least 99.99%, where $n$ is the size of the
- * population. This UDF is very useful for stratified sampling. For example,
+ * It takes a bag of n items and a sampling probability p as the inputs, and outputs a
+ * simple random sample of size exactly ceil(p*n) in a bag, with probability at least
+ * 99.99%. For example, the following script generates a simple random sample with
+ * sampling probability 0.1:
*
* <pre>
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('0.01');
- * examples = LOAD ...
- * grouped = GROUP examples BY label;
- * sampled = FOREACH grouped GENERATE FLATTEN(SRS(examples));
- * STORE sampled ...
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01));
+ * </pre>
+ *
+ * Optionally, user can provide a good lower bound of n as the third argument to help
+ * reduce the size of intermediate data, for example:
+ *
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double);
+ * summary = FOREACH (GROUP item ALL) GENERATE COUNT(item) AS count;
+ * sampled = FOREACH (GROUP item ALL) GENERATE FLATTEN(SRS(item, 0.01, summary.count));
* </pre>
*
- * We note that, in a Java Hadoop job, we can output pre-selected records directly using
+ * This UDF is very useful for stratified sampling. For example, the following script
+ * keeps all positive examples while downsampling negatives with probability 0.1:
+ *
+ * <pre>
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * item = LOAD 'input' AS (x:double, label:int);
+ * grouped = FOREACH (GROUP item BY label) GENERATE item, (group == 1 ? 1.0 : 0.1) AS p;
+ * sampled = FOREACH grouped GENERATE FLATTEN(SRS(item, p));
+ * </pre>
+ *
+ * In a Java Hadoop MapReduce job, we can output selected items directly using
* MultipleOutputs. However, this feature is not available in a Pig UDF. So we still let
- * pre-selected records go through the sort phase. However, as long as the sample size is
- * not huge, this should not be a big problem.
+ * selected items go through the sort phase. However, as long as the sample size is not
+ * huge, this should not be a big problem.
+ *
+ * In the first version, the sampling probability is specified in the constructor. This
+ * method is deprecated now and will be removed in the next release.
*
* @author ximeng
*
*/
public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
{
- private static final TupleFactory tupleFactory = TupleFactory.getInstance();
- private static final BagFactory bagFactory = BagFactory.getInstance();
+ /**
+ * Prefix for the output bag name.
+ */
+ public static final String OUTPUT_BAG_NAME_PREFIX = "SRS";
+
+ private static final TupleFactory _TUPLE_FACTORY = TupleFactory.getInstance();
+ private static final BagFactory _BAG_FACTORY = BagFactory.getInstance();
public SimpleRandomSample()
{
+ // empty
}
+ /**
+ * Constructs this UDF with a sampling probability.
+ *
+ * @deprecated Should specify the sampling probability in the function call.
+ */
+ @Deprecated
public SimpleRandomSample(String samplingProbability)
{
- Double p = Double.parseDouble(samplingProbability);
-
- if (p < 0.0 || p > 1.0)
- {
- throw new IllegalArgumentException("Sampling probability must be inside [0, 1].");
- }
+ double p = Double.parseDouble(samplingProbability);
+ verifySamplingProbability(p);
}
-
+
@Override
public String getInitial()
{
@@ -111,68 +144,146 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
throw new RuntimeException("Expected a BAG as input");
}
- return new Schema(new Schema.FieldSchema(getSchemaName(this.getClass()
- .getName()
- .toLowerCase(), input),
+ return new Schema(new Schema.FieldSchema(super.getSchemaName(OUTPUT_BAG_NAME_PREFIX,
+ input),
inputFieldSchema.schema,
DataType.BAG));
}
catch (FrontendException e)
{
- e.printStackTrace();
throw new RuntimeException(e);
}
}
static public class Initial extends EvalFunc<Tuple>
{
- private double _samplingProbability;
- private RandomDataImpl _rdg = new RandomDataImpl();
+ // Should avoid creating many random number generator instances.
+ private static RandomDataImpl _RNG = new RandomDataImpl();
+
+ synchronized private static double nextDouble()
+ {
+ return _RNG.nextUniform(0.0d, 1.0d);
+ }
public Initial()
{
+ // empty
}
+ @Deprecated
public Initial(String samplingProbability)
{
- _samplingProbability = Double.parseDouble(samplingProbability);
+ _p = Double.parseDouble(samplingProbability);
}
+
+ private boolean _first = true;
+ private double _p = -1.0d; // the sampling probability
+ private long _n1 = 0L; // the input lower bound of the size of the population
+ private long _localCount = 0L; // number of items processed by this instance
@Override
public Tuple exec(Tuple input) throws IOException
{
- Tuple output = tupleFactory.newTuple();
- DataBag selected = bagFactory.newDefaultBag();
- DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
+ int numArgs = input.size();
+
+ // The first if clause is for backward compatibility, which should be removed
+ // after we remove specifying sampling probability in the constructor.
+ if(numArgs == 1)
+ {
+ if(_p < 0.0d)
+ {
+ throw new IllegalArgumentException("Sampling probability is not given.");
+ }
+ }
+ else if (numArgs < 2 || numArgs > 3)
+ {
+ throw new IllegalArgumentException("The input tuple should have either two or three fields: "
+ + "a bag of items, the sampling probability, "
+ + "and optionally a good lower bound of the size of the population or the exact number.");
+ }
DataBag items = (DataBag) input.get(0);
-
- if (items != null)
+ long numItems = items.size();
+ _localCount += numItems;
+
+ // This is also for backward compatibility. Should change to
+ // double p = ((Number) input.get(1)).doubleValue();
+ // after we remove specifying sampling probability in the constructor.
+ double p = numArgs == 1 ? _p : ((Number) input.get(1)).doubleValue();
+ if (_first)
+ {
+ _p = p;
+ verifySamplingProbability(p);
+ }
+ else
{
- long n = items.size();
+ if (p != _p)
+ {
+ throw new IllegalArgumentException("The sampling probability must be a scalar, but found two different values: "
+ + _p + " and " + p + ".");
+ }
+ }
- double q1 = getQ1(n, _samplingProbability);
- double q2 = getQ2(n, _samplingProbability);
+ long n1 = 0L;
+ if (numArgs > 2)
+ {
+ n1 = ((Number) input.get(2)).longValue();
- for (Tuple item : items)
+ if (_first)
+ {
+ _n1 = n1;
+ }
+ else
{
- double key = _rdg.nextUniform(0.0d, 1.0d);
+ if (n1 != _n1)
+ {
+ throw new IllegalArgumentException("The lower bound of the population size must be a scalar, but found two different values: "
+ + _n1 + " and " + n1 + ".");
+ }
+ }
+ }
+
+ _first = false;
+
+ // Use the local count if the input lower bound is smaller.
+ n1 = Math.max(n1, _localCount);
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag waiting = _BAG_FACTORY.newDefaultBag();
- if (key < q1)
+ if (n1 > 0L)
+ {
+ double q1 = getQ1(n1, p);
+ double q2 = getQ2(n1, p);
+
+ for (Tuple t : items)
+ {
+ double x = nextDouble();
+ if (x < q1)
{
- selected.add(item);
+ selected.add(t);
}
- else if (key < q2)
+ else if (x < q2)
{
- waiting.add(new ScoredTuple(key, item).getIntermediateTuple(tupleFactory));
+ waiting.add(new ScoredTuple(x, t).getIntermediateTuple(_TUPLE_FACTORY));
}
}
-
- output.append(n);
- output.append(selected);
- output.append(waiting);
}
+ /*
+ * The output tuple contains the following fields: sampling probability (double),
+ * number of processed items in this tuple (long), a good lower bound of the size of
+ * the population or the exact number (long), a bag of selected items (bag), and a
+ * bag of waitlisted items with scores (bag).
+ */
+ Tuple output = _TUPLE_FACTORY.newTuple();
+
+ output.append(p);
+ output.append(numItems);
+ output.append(n1);
+ output.append(selected);
+ output.append(waiting);
+
return output;
}
}
@@ -181,36 +292,51 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
{
public Intermediate()
{
+ // empty
}
-
+
+ @Deprecated
public Intermediate(String samplingProbability)
{
- _samplingProbability = Double.parseDouble(samplingProbability);
+ // empty
}
- private double _samplingProbability;
-
@Override
public Tuple exec(Tuple input) throws IOException
{
DataBag bag = (DataBag) input.get(0);
- DataBag selected = bagFactory.newDefaultBag();
- DataBag aggWaiting = bagFactory.newSortedBag(new ScoredTupleComparator());
- DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
- Tuple output = tupleFactory.newTuple();
- long n = 0L;
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag aggWaiting = _BAG_FACTORY.newDefaultBag();
- for (Tuple innerTuple : bag)
+ boolean first = true;
+ double p = 0.0d;
+ long numItems = 0L; // number of items processed, including rejected
+ long n1 = 0L;
+
+ for (Tuple tuple : bag)
{
- n += (Long) innerTuple.get(0);
+ if (first)
+ {
+ p = (Double) tuple.get(0);
+ first = false;
+ }
+
+ numItems += (Long) tuple.get(1);
+ n1 = Math.max((Long) tuple.get(2), numItems);
+
+ selected.addAll((DataBag) tuple.get(3));
+ aggWaiting.addAll((DataBag) tuple.get(4));
+ }
- selected.addAll((DataBag) innerTuple.get(1));
+ DataBag waiting = _BAG_FACTORY.newDefaultBag();
- double q1 = getQ1(n, _samplingProbability);
- double q2 = getQ2(n, _samplingProbability);
+ if (n1 > 0L)
+ {
+ double q1 = getQ1(n1, p);
+ double q2 = getQ2(n1, p);
- for (Tuple t : (DataBag) innerTuple.get(2))
+ for (Tuple t : aggWaiting)
{
ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
@@ -220,94 +346,129 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
}
else if (scored.getScore() < q2)
{
- aggWaiting.add(t);
- }
- else
- {
- break;
+ waiting.add(t);
}
}
}
- double q1 = getQ1(n, _samplingProbability);
- double q2 = getQ2(n, _samplingProbability);
+ Tuple output = _TUPLE_FACTORY.newTuple();
- for (Tuple t : aggWaiting)
- {
- ScoredTuple scored = ScoredTuple.fromIntermediateTuple(t);
-
- if (scored.getScore() < q1)
- {
- selected.add(scored.getTuple());
- }
- else if (scored.getScore() < q2)
- {
- waiting.add(t);
- }
- else
- {
- break;
- }
- }
-
- output.append(n);
+ output.append(p);
+ output.append(numItems);
+ output.append(n1);
output.append(selected);
output.append(waiting);
- System.err.println("Read " + n + " items, selected " + selected.size()
- + ", and wait-listed " + aggWaiting.size() + ".");
-
return output;
}
}
static public class Final extends EvalFunc<DataBag>
{
- private double _samplingProbability;
-
public Final()
{
+ // empty
}
+ @Deprecated
public Final(String samplingProbability)
{
- _samplingProbability = Double.parseDouble(samplingProbability);
+ // empty
}
-
+
@Override
public DataBag exec(Tuple input) throws IOException
{
DataBag bag = (DataBag) input.get(0);
- long n = 0L;
- DataBag selected = bagFactory.newDefaultBag();
- DataBag waiting = bagFactory.newSortedBag(new ScoredTupleComparator());
- for (Tuple innerTuple : bag)
+ boolean first = true;
+ double p = 0.0d; // the sampling probability
+ long n = 0L; // the size of the population (total number of items)
+
+ DataBag selected = _BAG_FACTORY.newDefaultBag();
+ DataBag waiting = _BAG_FACTORY.newSortedBag(ScoredTupleComparator.getInstance());
+
+ for (Tuple tuple : bag)
{
- n += (Long) innerTuple.get(0);
- selected.addAll((DataBag) innerTuple.get(1));
- waiting.addAll((DataBag) innerTuple.get(2));
+ if (first)
+ {
+ p = (Double) tuple.get(0);
+ first = false;
+ }
+
+ n += (Long) tuple.get(1);
+ selected.addAll((DataBag) tuple.get(3));
+ waiting.addAll((DataBag) tuple.get(4));
}
- long sampleSize = (long) Math.ceil(_samplingProbability * n);
- long nNeeded = sampleSize - selected.size();
+ long numSelected = selected.size();
+ long numWaiting = waiting.size();
+
+ long s = (long) Math.ceil(p * n); // sample size
+
+ System.out.println("To sample " + s + " items from " + n + ", we pre-selected "
+ + numSelected + ", and waitlisted " + waiting.size() + ".");
+
+ long numNeeded = s - selected.size();
+
+ if (numNeeded < 0)
+ {
+ System.err.println("Pre-selected " + numSelected + " items, but only needed " + s
+ + ".");
+ }
for (Tuple scored : waiting)
{
- if (nNeeded <= 0)
+ if (numNeeded <= 0)
{
break;
}
selected.add(ScoredTuple.fromIntermediateTuple(scored).getTuple());
- nNeeded--;
+ numNeeded--;
+ }
+
+ if (numNeeded > 0)
+ {
+ System.err.println("The waiting list only has " + numWaiting
+ + " items, but needed " + numNeeded + " more.");
}
return selected;
}
}
- private static class ScoredTupleComparator implements Comparator<Tuple>
+ // computes a threshold to select items
+ private static double getQ1(long n, double p)
+ {
+ double t1 = 20.0d / (3.0d * n);
+ double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0d * t1 * p);
+ return q1;
+ }
+
+ // computes a threshold to reject items
+ private static double getQ2(long n, double p)
+ {
+ double t2 = 10.0d / n;
+ double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0d * t2 * p);
+ return q2;
+ }
+
+ private static void verifySamplingProbability(double p)
+ {
+ if(p < 0.0 || p > 1.0)
+ {
+ throw new IllegalArgumentException("Sampling probabiilty must be inside [0, 1].");
+ }
+ }
+
+ static class ScoredTupleComparator implements Comparator<Tuple>
{
+ public static final ScoredTupleComparator getInstance()
+ {
+ return _instance;
+ }
+
+ private static final ScoredTupleComparator _instance = new ScoredTupleComparator();
@Override
public int compare(Tuple o1, Tuple o2)
@@ -325,17 +486,4 @@ public class SimpleRandomSample extends AlgebraicEvalFunc<DataBag>
}
}
- private static double getQ1(long n, double p)
- {
- double t1 = 20.0 / (3.0 * n);
- double q1 = p + t1 - Math.sqrt(t1 * t1 + 3.0 * t1 * p);
- return q1;
- }
-
- private static double getQ2(long n, double p)
- {
- double t2 = 10.0 / n;
- double q2 = p + t2 + Math.sqrt(t2 * t2 + 2.0 * t2 * p);
- return q2;
- }
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
index 7a0ced2..bd34881 100644
--- a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
+++ b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTest.java
@@ -37,11 +37,11 @@ public class SimpleRandomSampleTest extends PigTests
/**
* register $JAR_PATH
*
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
+ * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p) as sample_data;
*
* sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
*
@@ -50,8 +50,42 @@ public class SimpleRandomSampleTest extends PigTests
@Multiline
private String simpleRandomSampleTest;
+ /**
+ * register $JAR_PATH
+ *
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p, $n1) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String simpleRandomSampleWithLowerBoundTest;
+
+ /**
+ * register $JAR_PATH
+ *
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data, $p1) as sample_1, SRS(data,
+ * $p2) AS sample_2;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_1) AS sample_count_1, COUNT(sample_2)
+ * AS sample_count_2;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String simpleRandomSampleWithTwoCallsTest;
+
@Test
- public void simpleRandomSampleTest() throws Exception
+ public void testSimpleRandomSample() throws Exception
{
writeLinesToFile("input",
"A1\tB1\t1",
@@ -90,22 +124,42 @@ public class SimpleRandomSampleTest extends PigTests
int n = 32;
double p = 0.3;
int s = (int) Math.ceil(p * n);
- PigTest test =
- createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
+ PigTest test = createPigTestFromString(simpleRandomSampleTest, "p=" + p);
test.runScript();
-
assertOutput(test, "sampled", "(" + s + ")");
+
+ int n1 = 30;
+ PigTest testWithLB =
+ createPigTestFromString(simpleRandomSampleWithLowerBoundTest, "p=" + p, "n1="
+ + n1);
+ testWithLB.runScript();
+ assertOutput(testWithLB, "sampled", "(" + s + ")");
+
+ double p1 = 0.05;
+ double p2 = 0.95;
+ int s1 = (int) Math.ceil(p1 * n);
+ int s2 = (int) Math.ceil(p2 * n);
+
+ PigTest testWithTwoCalls =
+ createPigTestFromString(simpleRandomSampleWithTwoCallsTest, "p1=" + p1, "p2="
+ + p2);
+ testWithTwoCalls.runScript();
+ assertOutput(testWithTwoCalls, "sampled", "(" + s1 + "," + s2 + ")");
+
+ test.runScript();
+
}
/**
* register $JAR_PATH
*
- * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample();
*
* data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
*
- * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
+ * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data,
+ * $SAMPLING_PROBABILITY) as sample_data;
*
* sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
*
@@ -117,7 +171,7 @@ public class SimpleRandomSampleTest extends PigTests
private String stratifiedSampleTest;
@Test
- public void stratifiedSampleTest() throws Exception
+ public void testStratifiedSample() throws Exception
{
writeLinesToFile("input",
"A1\tB1\t1",
@@ -171,4 +225,6 @@ public class SimpleRandomSampleTest extends PigTests
"(A8,1)",
"(A9,2)");
}
+
+
}
http://git-wip-us.apache.org/repos/asf/incubator-datafu/blob/a4b17da9/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
----------------------------------------------------------------------
diff --git a/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
new file mode 100644
index 0000000..15e1fd6
--- /dev/null
+++ b/test/pig/datafu/test/pig/sampling/SimpleRandomSampleTestOld.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package datafu.test.pig.sampling;
+
+import org.adrianwalker.multilinestring.Multiline;
+import org.apache.pig.pigunit.PigTest;
+import org.testng.annotations.Test;
+
+import datafu.pig.sampling.SimpleRandomSample;
+import datafu.test.pig.PigTests;
+
+/**
+ * Tests for {@link SimpleRandomSample}.
+ *
+ * @deprecated This tests the deprecated functionality of SimpleRandomSample
+ * where the probability can be specified in the constructor.
+ * @author ximeng
+ *
+ */
+public class SimpleRandomSampleTestOld extends PigTests
+{
+ /**
+ * register $JAR_PATH
+ *
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data ALL) GENERATE SRS(data) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE COUNT(sample_data) AS sample_count;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String simpleRandomSampleTest;
+
+ @Test
+ public void simpleRandomSampleTest() throws Exception
+ {
+ writeLinesToFile("input",
+ "A1\tB1\t1",
+ "A1\tB1\t4",
+ "A1\tB3\t4",
+ "A1\tB4\t4",
+ "A2\tB1\t4",
+ "A2\tB2\t4",
+ "A3\tB1\t3",
+ "A3\tB1\t1",
+ "A3\tB3\t77",
+ "A4\tB1\t3",
+ "A4\tB2\t3",
+ "A4\tB3\t59",
+ "A4\tB4\t29",
+ "A5\tB1\t4",
+ "A6\tB2\t3",
+ "A6\tB2\t55",
+ "A6\tB3\t1",
+ "A7\tB1\t39",
+ "A7\tB2\t27",
+ "A7\tB3\t85",
+ "A8\tB1\t4",
+ "A8\tB2\t45",
+ "A9\tB3\t92",
+ "A9\tB3\t0",
+ "A9\tB6\t42",
+ "A9\tB5\t1",
+ "A10\tB1\t7",
+ "A10\tB2\t23",
+ "A10\tB2\t1",
+ "A10\tB2\t31",
+ "A10\tB6\t41",
+ "A10\tB7\t52");
+
+ int n = 32;
+ double p = 0.3;
+ int s = (int) Math.ceil(p * n);
+ PigTest test =
+ createPigTestFromString(simpleRandomSampleTest, "SAMPLING_PROBABILITY=" + p);
+
+ test.runScript();
+
+ assertOutput(test, "sampled", "(" + s + ")");
+ }
+
+ /**
+ * register $JAR_PATH
+ *
+ * DEFINE SRS datafu.pig.sampling.SimpleRandomSample('$SAMPLING_PROBABILITY');
+ *
+ * data = LOAD 'input' AS (A_id:chararray, B_id:chararray, C:int);
+ *
+ * sampled = FOREACH (GROUP data BY A_id) GENERATE group, SRS(data) as sample_data;
+ *
+ * sampled = FOREACH sampled GENERATE group, COUNT(sample_data) AS sample_count;
+ *
+ * sampled = ORDER sampled BY group;
+ *
+ * STORE sampled INTO 'output';
+ */
+ @Multiline
+ private String stratifiedSampleTest;
+
+ @Test
+ public void stratifiedSampleTest() throws Exception
+ {
+ writeLinesToFile("input",
+ "A1\tB1\t1",
+ "A1\tB1\t4",
+ "A1\tB3\t4",
+ "A1\tB4\t4",
+ "A2\tB1\t4",
+ "A2\tB2\t4",
+ "A3\tB1\t3",
+ "A3\tB1\t1",
+ "A3\tB3\t77",
+ "A4\tB1\t3",
+ "A4\tB2\t3",
+ "A4\tB3\t59",
+ "A4\tB4\t29",
+ "A5\tB1\t4",
+ "A6\tB2\t3",
+ "A6\tB2\t55",
+ "A6\tB3\t1",
+ "A7\tB1\t39",
+ "A7\tB2\t27",
+ "A7\tB3\t85",
+ "A8\tB1\t4",
+ "A8\tB2\t45",
+ "A9\tB3\t92",
+ "A9\tB3\t0",
+ "A9\tB6\t42",
+ "A9\tB5\t1",
+ "A10\tB1\t7",
+ "A10\tB2\t23",
+ "A10\tB2\t1",
+ "A10\tB2\t31",
+ "A10\tB6\t41",
+ "A10\tB7\t52");
+
+ double p = 0.5;
+
+ PigTest test =
+ createPigTestFromString(stratifiedSampleTest, "SAMPLING_PROBABILITY=" + p);
+ test.runScript();
+ assertOutput(test,
+ "sampled",
+ "(A1,2)",
+ "(A10,3)",
+ "(A2,1)",
+ "(A3,2)",
+ "(A4,2)",
+ "(A5,1)",
+ "(A6,2)",
+ "(A7,2)",
+ "(A8,1)",
+ "(A9,2)");
+ }
+}
[2/2] git commit: Merge with master
Posted by mh...@apache.org.
Merge with master
Project: http://git-wip-us.apache.org/repos/asf/incubator-datafu/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-datafu/commit/41a0c2cc
Tree: http://git-wip-us.apache.org/repos/asf/incubator-datafu/tree/41a0c2cc
Diff: http://git-wip-us.apache.org/repos/asf/incubator-datafu/diff/41a0c2cc
Branch: refs/heads/master
Commit: 41a0c2ccfc5e980b715d5d11d54eb673462de1a7
Parents: 424e3b4 a4b17da
Author: Matt Hayes <mh...@linkedin.com>
Authored: Mon Jan 27 17:37:05 2014 -0800
Committer: Matt Hayes <mh...@linkedin.com>
Committed: Mon Jan 27 17:38:22 2014 -0800
----------------------------------------------------------------------
.../datafu/pig/sampling/SimpleRandomSample.java | 386 +++++++++++++------
.../pig/sampling/SimpleRandomSampleTest.java | 74 +++-
.../pig/sampling/SimpleRandomSampleTestOld.java | 176 +++++++++
3 files changed, 508 insertions(+), 128 deletions(-)
----------------------------------------------------------------------