You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dorx <gi...@git.apache.org> on 2014/08/09 08:37:52 UTC
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
GitHub user dorx opened a pull request:
https://github.com/apache/spark/pull/1866
[SPARK-2937] Separate out samplyByKeyExact as its own API in PairRDDFunction
To enable Python consistency and `Experimental` label of the `sampleByKeyExact` API.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/dorx/spark stratified
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/1866.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1866
----
commit 14419775202e6eef1f0e1f0c74c7be9030aca73d
Author: Doris Xin <do...@gmail.com>
Date: 2014-05-29T22:22:14Z
SPARK-1939 Refactor takeSample method in RDD to use ScaSRS
commit ffea61a67d228edb476d29ca13a84bb3f9a22887
Author: Doris Xin <do...@gmail.com>
Date: 2014-05-30T00:55:54Z
SPARK-1939: Refactor takeSample method in RDD
Reviewer comments addressed:
- commons-math3 is now a test-only dependency. bumped up to v3.3
- comments added to explain what computeFraction is doing
- fixed the unit for computeFraction to use BinomialDitro for without
replacement sampling
- stylistic fixes
commit 7cab53a3926f4351432e5e3600b0796b9a4146e4
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-02T19:00:38Z
fixed import bug in rdd.py
commit e3fd6a628317d559a08a7a20421e9c0618180902
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-02T19:06:18Z
Merge branch 'master' into takeSample
commit 9ee94ee3c28e8d808063fef4e5d39f06ab738e0b
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-09T20:15:23Z
[SPARK-2082] stratified sampling in PairRDDFunctions that guarantees exact sample size
commit 1d413ce877a67379a0a74afefba071c018b0ca70
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-09T21:14:26Z
fixed checkstyle issues
commit 7e1a48182ebec54cd3a6a290b1dc27b928f57dba
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-09T21:29:30Z
changed the permission on SamplingUtil
commit 46f6c8c86f3e1fdaf49b63796f1cd5bd6db79ec7
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-10T00:39:50Z
fixed the NPE caused by closures being cleaned before being passed into the aggregate function
commit 50581fc8b08bd5f18cdf2288772c22f2549af0a5
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-12T21:36:37Z
added a TODO for logging in python
commit 73276111a2c4a9354bfbf6414afceabf70fb21e5
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-13T00:28:14Z
merge master
commit 9e74ab505e5441eedfed5dbfbeac37566d3de1f0
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-17T22:24:22Z
Separated out most of the logic in sampleByKey
into StratifiedSampler in util.random
commit 90d94c0d4f4d909fb99b8e72f9a09ca5329e070c
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-17T22:57:11Z
merge master
commit 0214a7659c62e4ff0f68f6e09cd7846547cd3bcb
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-18T02:22:32Z
cleanUp
Addressed reviewer comments and added better documentation of code.
Added commons-math3 as a dependency of spark (okay’ed by Matei). “mvm
clean install” compiled. Recovered files that were reverted by accident
in the merge.
TODOs: figure out API for sampleByKeyExact and update Java, Python, and
the markdown file accordingly.
commit 944a10cff3c218bafcb8b43e7e3f309cc644633e
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-19T02:48:22Z
[SPARK-2145] Add lower bound on sampling rate
to guarantee sampling performance
commit 1fe1cff58d99f336406f67f89f53fe6ab3bfde5e
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-19T19:46:59Z
Changed fractionByKey to a map to enable arg check
commit bd9dc6e08444ec83c5d6adc4c452b49d1ac2b154
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-19T20:00:47Z
unit bug and style violation fixed
commit 4ad516b14559f06a32d65ef0ce2fa2d7526610bc
Author: Doris Xin <do...@gmail.com>
Date: 2014-06-20T18:38:09Z
remove unused imports from PairRDDFunctions
commit 254e03c96e1f2aaa5baa9c3d384adeb117e0b7ab
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-03T20:49:46Z
minor fixes and Java API.
punting on python for now. moved aggregateWithContext out of RDD
commit 6b5b10b71669590f862ec2b7d004ba976d59676c
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-03T20:58:29Z
Merge branch 'master' into stratified
commit ee9d260e5eb6ee2d6912e57491daeded1704248c
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-06T23:20:39Z
addressed reviewer comments
commit bbfb8c91a68deae076db7d9fdc518b6cee8a99f1
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-06T23:21:20Z
Merge branch 'master' into stratified
commit 9884a9f03b18f3170d90d168c246d78fc463028e
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-08T18:58:39Z
style fix
commit 680b677bc5276e1499c59c7e24abfae7d85e5c7d
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-08T23:49:25Z
use mapPartitionWithIndex instead
also better readability and lots more comments.
commit a2bf756454b2ae6b71ddc67457c0f69f165b937d
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-08T23:50:17Z
Merge branch 'master' into stratified
commit a10e68dd9c23b0fc1f25effd9ccd0ac3e7299206
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-09T01:09:14Z
style fix
commit f4c21f324075bc5e7b8cba07a5c47d23fade542f
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-15T01:34:10Z
Reviewer comments
Added BernoulliBounds
commit eecee5fb31aa678db872eccbd7385461ea3a6a96
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-15T02:56:58Z
Merge branch 'master' into stratified
Conflicts:
project/SparkBuild.scala
commit b3013a44078f5aa382d986df41af0e4067147077
Author: Xiangrui Meng <me...@databricks.com>
Date: 2014-07-25T05:21:51Z
move math3 back to test scope
commit b2235297636495061461db14e7a9709afb378bc1
Author: Xiangrui Meng <me...@databricks.com>
Date: 2014-07-25T06:09:34Z
use approx bounds for poisson
fix poisson mean for waitlisting
add unit tests for Java
commit ea7d27f01cf8bbe7e5be9dd19ed25ac0fc1e0a99
Author: Doris Xin <do...@gmail.com>
Date: 2014-07-28T18:41:06Z
merge master
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51679854
QA results for PR 1866:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18246/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032165
--- Diff: core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---
@@ -133,68 +133,64 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
- *
- * If `exact` is set to false, create the sample via simple random sampling, with one pass
- * over the RDD, to produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
- * the RDD to create a sample size that's exactly equal to the sum of
+ * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
+ * RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
- exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))
+ new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
- *
- * If `exact` is set to false, create the sample via simple random sampling, with one pass
- * over the RDD, to produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
- * the RDD to create a sample size that's exactly equal to the sum of
+ * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
+ * RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
- * Use Utils.random.nextLong as the default seed for the random number generator
+ * Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean,
- fractions: JMap[K, Double],
- exact: Boolean): JavaPairRDD[K, V] =
- sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)
+ fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+ sampleByKey(withReplacement, fractions, Utils.random.nextLong)
/**
- * Return a subset of this RDD sampled by key (via stratified sampling).
+ * ::Experimental::
*
- * Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
+ * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
+ * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
*
- * Produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
- * simple random sampling.
+ * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
+ * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
+ * over all key values with a 99.99% confidence. When sampling without replacement, we need one
+ * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
+ * two additional passes.
*/
- def sampleByKey(withReplacement: Boolean,
+ @Experimental
+ def sampleByKeyExact(withReplacement: Boolean,
fractions: JMap[K, Double],
seed: Long): JavaPairRDD[K, V] =
- sampleByKey(withReplacement, fractions, false, seed)
+ new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed))
/**
- * Return a subset of this RDD sampled by key (via stratified sampling).
+ * ::Experimental::
*
--- End diff --
ditto: remove this line
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51730126
QA tests have started for PR 1866. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18289/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51731068
Merged into both master and branch-1.1. Thanks!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032164
--- Diff: core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---
@@ -133,68 +133,64 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
- *
- * If `exact` is set to false, create the sample via simple random sampling, with one pass
- * over the RDD, to produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
- * the RDD to create a sample size that's exactly equal to the sum of
+ * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
+ * RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*/
def sampleByKey(withReplacement: Boolean,
fractions: JMap[K, Double],
- exact: Boolean,
seed: Long): JavaPairRDD[K, V] =
- new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))
+ new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
/**
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
- *
- * If `exact` is set to false, create the sample via simple random sampling, with one pass
- * over the RDD, to produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
- * the RDD to create a sample size that's exactly equal to the sum of
+ * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
+ * RDD, to produce a sample of size that's approximately equal to the sum of
* math.ceil(numItems * samplingRate) over all key values.
*
- * Use Utils.random.nextLong as the default seed for the random number generator
+ * Use Utils.random.nextLong as the default seed for the random number generator.
*/
def sampleByKey(withReplacement: Boolean,
- fractions: JMap[K, Double],
- exact: Boolean): JavaPairRDD[K, V] =
- sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)
+ fractions: JMap[K, Double]): JavaPairRDD[K, V] =
+ sampleByKey(withReplacement, fractions, Utils.random.nextLong)
/**
- * Return a subset of this RDD sampled by key (via stratified sampling).
+ * ::Experimental::
*
--- End diff --
Please remove this line so both `:: Experimental ::` and the first sentence show up in the summary of the generated doc. Otherwise, only `:: Experimental ::` appears in the summary.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/1866
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032172
--- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -556,6 +519,97 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
intercept[IllegalArgumentException] {shuffled.lookup(-1)}
}
+ private object StratifiedAuxiliary {
+ def stratifier (fractionPositive: Double) = {
+ (x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
+ }
+
+ def checkSize(exact: Boolean,
+ withReplacement: Boolean,
+ expected: Long,
+ actual: Long,
+ p: Double): Boolean = {
+ if (exact) {
+ return expected == actual
+ }
+ val stdev = if (withReplacement) math.sqrt(expected) else math.sqrt(expected * p * (1 - p))
+ // Very forgiving margin since we're dealing with very small sample sizes most of the time
+ math.abs(actual - expected) <= 6 * stdev
+ }
+
+ def testSampleExact(stratifiedData: RDD[(String, Int)],
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ testBernoulli(stratifiedData, true, samplingRate, seed, n)
+ testPoisson(stratifiedData, true, samplingRate, seed, n)
+ }
+
+ def testSample(stratifiedData: RDD[(String, Int)],
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ testBernoulli(stratifiedData, false, samplingRate, seed, n)
+ testPoisson(stratifiedData, false, samplingRate, seed, n)
+ }
+
+ // Without replacement validation
+ def testBernoulli(stratifiedData: RDD[(String, Int)],
+ exact: Boolean,
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ val expectedSampleSize = stratifiedData.countByKey()
+ .mapValues(count => math.ceil(count * samplingRate).toInt)
+ val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
+ val sample = if (exact) {
+ stratifiedData.sampleByKeyExact(false, fractions, seed)
+ } else {
+ stratifiedData.sampleByKey(false, fractions, seed)
+ }
+ val sampleCounts = sample.countByKey()
+ val takeSample = sample.collect()
+ sampleCounts.foreach { case(k, v) =>
+ assert(checkSize(exact, false, expectedSampleSize(k), v, samplingRate)) }
+ assert(takeSample.size === takeSample.toSet.size)
+ takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
+ }
+
+ // With replacement validation
+ def testPoisson(stratifiedData: RDD[(String, Int)],
+ exact: Boolean,
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
+ math.ceil(count * samplingRate).toInt)
+ val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
+ val sample = if (exact) {
+ stratifiedData.sampleByKeyExact(true, fractions, seed)
+ } else {
+ stratifiedData.sampleByKey(true, fractions, seed)
+ }
+ val sampleCounts = sample.countByKey()
+ val takeSample = sample.collect()
+ sampleCounts.foreach { case (k, v) =>
+ assert(checkSize(exact, true, expectedSampleSize(k), v, samplingRate)) }
+ val groupedByKey = takeSample.groupBy(_._1)
+ for ((key, v) <- groupedByKey) {
+ if (expectedSampleSize(key) >= 100 && samplingRate >= 0.1) {
+ // sample large enough for there to be repeats with high likelihood
+ assert(v.toSet.size < expectedSampleSize(key))
+ } else {
+ if (exact) {
+ assert(v.toSet.size <= expectedSampleSize(key))
+ } else {
+ assert(checkSize(false, true, expectedSampleSize(key), v.toSet.size, samplingRate))
+ }
+ }
+ }
+ takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
--- End diff --
minor: `takeSample.foreach(x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]"))` We usually use `{ ... }` for pattern matching or multi-line statement.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032168
--- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
@@ -1239,12 +1239,28 @@ public void sampleByKey() {
Assert.assertTrue(worCounts.size() == 2);
Assert.assertTrue(worCounts.get(0) > 0);
Assert.assertTrue(worCounts.get(1) > 0);
- JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKey(true, fractions, true, 1L);
+ }
+
+ @Test
+ @SuppressWarnings("unchecked")
+ public void sampleByKeyExact() {
+ JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
+ JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
+ new PairFunction<Integer, Integer, Integer>() {
--- End diff --
Using two-space indentation?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032166
--- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---
@@ -197,33 +197,57 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
* Return a subset of this RDD sampled by key (via stratified sampling).
*
* Create a sample of this RDD using variable sampling rates for different keys as specified by
- * `fractions`, a key to sampling rate map.
- *
- * If `exact` is set to false, create the sample via simple random sampling, with one pass
- * over the RDD, to produce a sample of size that's approximately equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values; otherwise, use
- * additional passes over the RDD to create a sample size that's exactly equal to the sum of
- * math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
- * without replacement, we need one additional pass over the RDD to guarantee sample size;
- * when sampling with replacement, we need two additional passes.
+ * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
+ * RDD, to produce a sample of size that's approximately equal to the sum of
+ * math.ceil(numItems * samplingRate) over all key values.
*
* @param withReplacement whether to sample with or without replacement
* @param fractions map of specific keys to sampling rates
* @param seed seed for the random number generator
- * @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
* @return RDD containing the sampled subset
*/
def sampleByKey(withReplacement: Boolean,
fractions: Map[K, Double],
- exact: Boolean = false,
- seed: Long = Utils.random.nextLong): RDD[(K, V)]= {
+ seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
+
+ require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
+
+ val samplingFunc = if (withReplacement) {
+ StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
+ } else {
+ StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
+ }
+ self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
+ }
+
+ /**
+ * ::Experimental::
+ *
--- End diff --
ditto: remove this line
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51731332
QA results for PR 1866:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18289/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51678949
QA tests have started for PR 1866. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18246/consoleFull
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:
https://github.com/apache/spark/pull/1866#discussion_r16032170
--- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
@@ -556,6 +519,97 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
intercept[IllegalArgumentException] {shuffled.lookup(-1)}
}
+ private object StratifiedAuxiliary {
+ def stratifier (fractionPositive: Double) = {
+ (x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
+ }
+
+ def checkSize(exact: Boolean,
+ withReplacement: Boolean,
+ expected: Long,
+ actual: Long,
+ p: Double): Boolean = {
+ if (exact) {
+ return expected == actual
+ }
+ val stdev = if (withReplacement) math.sqrt(expected) else math.sqrt(expected * p * (1 - p))
+ // Very forgiving margin since we're dealing with very small sample sizes most of the time
+ math.abs(actual - expected) <= 6 * stdev
+ }
+
+ def testSampleExact(stratifiedData: RDD[(String, Int)],
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ testBernoulli(stratifiedData, true, samplingRate, seed, n)
+ testPoisson(stratifiedData, true, samplingRate, seed, n)
+ }
+
+ def testSample(stratifiedData: RDD[(String, Int)],
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ testBernoulli(stratifiedData, false, samplingRate, seed, n)
+ testPoisson(stratifiedData, false, samplingRate, seed, n)
+ }
+
+ // Without replacement validation
+ def testBernoulli(stratifiedData: RDD[(String, Int)],
+ exact: Boolean,
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ val expectedSampleSize = stratifiedData.countByKey()
+ .mapValues(count => math.ceil(count * samplingRate).toInt)
+ val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
+ val sample = if (exact) {
+ stratifiedData.sampleByKeyExact(false, fractions, seed)
+ } else {
+ stratifiedData.sampleByKey(false, fractions, seed)
+ }
+ val sampleCounts = sample.countByKey()
+ val takeSample = sample.collect()
+ sampleCounts.foreach { case(k, v) =>
+ assert(checkSize(exact, false, expectedSampleSize(k), v, samplingRate)) }
+ assert(takeSample.size === takeSample.toSet.size)
+ takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
+ }
+
+ // With replacement validation
+ def testPoisson(stratifiedData: RDD[(String, Int)],
+ exact: Boolean,
+ samplingRate: Double,
+ seed: Long,
+ n: Long) = {
+ val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
+ math.ceil(count * samplingRate).toInt)
+ val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
+ val sample = if (exact) {
+ stratifiedData.sampleByKeyExact(true, fractions, seed)
+ } else {
+ stratifiedData.sampleByKey(true, fractions, seed)
+ }
+ val sampleCounts = sample.countByKey()
+ val takeSample = sample.collect()
+ sampleCounts.foreach { case (k, v) =>
+ assert(checkSize(exact, true, expectedSampleSize(k), v, samplingRate)) }
--- End diff --
move `}` to next line
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51730672
Looks good, I also prefer separating this
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org
[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...
Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:
https://github.com/apache/spark/pull/1866#issuecomment-51720797
LGTM except inline comments. Thanks for keeping APIs consistent across languages!!
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org