You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dorx <gi...@git.apache.org> on 2014/08/09 08:37:52 UTC

[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

GitHub user dorx opened a pull request:

    https://github.com/apache/spark/pull/1866

    [SPARK-2937] Separate out samplyByKeyExact as its own API in PairRDDFunction

    To enable Python consistency and `Experimental` label of the `sampleByKeyExact` API.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dorx/spark stratified

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1866.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1866
    
----
commit 14419775202e6eef1f0e1f0c74c7be9030aca73d
Author: Doris Xin <do...@gmail.com>
Date:   2014-05-29T22:22:14Z

    SPARK-1939 Refactor takeSample method in RDD to use ScaSRS

commit ffea61a67d228edb476d29ca13a84bb3f9a22887
Author: Doris Xin <do...@gmail.com>
Date:   2014-05-30T00:55:54Z

    SPARK-1939: Refactor takeSample method in RDD
    
    Reviewer comments addressed:
    - commons-math3 is now a test-only dependency. bumped up to v3.3
    - comments added to explain what computeFraction is doing
    - fixed the unit for computeFraction to use BinomialDitro for without
    replacement sampling
    - stylistic fixes

commit 7cab53a3926f4351432e5e3600b0796b9a4146e4
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-02T19:00:38Z

    fixed import bug in rdd.py

commit e3fd6a628317d559a08a7a20421e9c0618180902
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-02T19:06:18Z

    Merge branch 'master' into takeSample

commit 9ee94ee3c28e8d808063fef4e5d39f06ab738e0b
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-09T20:15:23Z

    [SPARK-2082] stratified sampling in PairRDDFunctions that guarantees exact sample size

commit 1d413ce877a67379a0a74afefba071c018b0ca70
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-09T21:14:26Z

    fixed checkstyle issues

commit 7e1a48182ebec54cd3a6a290b1dc27b928f57dba
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-09T21:29:30Z

    changed the permission on SamplingUtil

commit 46f6c8c86f3e1fdaf49b63796f1cd5bd6db79ec7
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-10T00:39:50Z

    fixed the NPE caused by closures being cleaned before being passed into the aggregate function

commit 50581fc8b08bd5f18cdf2288772c22f2549af0a5
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-12T21:36:37Z

    added a TODO for logging in python

commit 73276111a2c4a9354bfbf6414afceabf70fb21e5
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-13T00:28:14Z

    merge master

commit 9e74ab505e5441eedfed5dbfbeac37566d3de1f0
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-17T22:24:22Z

    Separated out most of the logic in sampleByKey
    
    into StratifiedSampler in util.random

commit 90d94c0d4f4d909fb99b8e72f9a09ca5329e070c
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-17T22:57:11Z

    merge master

commit 0214a7659c62e4ff0f68f6e09cd7846547cd3bcb
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-18T02:22:32Z

    cleanUp
    
    Addressed reviewer comments and added better documentation of code.
    Added commons-math3 as a dependency of spark (okay’ed by Matei). “mvm
    clean install” compiled. Recovered files that were reverted by accident
    in the merge.
    TODOs: figure out API for sampleByKeyExact and update Java, Python, and
    the markdown file accordingly.

commit 944a10cff3c218bafcb8b43e7e3f309cc644633e
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-19T02:48:22Z

    [SPARK-2145] Add lower bound on sampling rate
    
    to guarantee sampling performance

commit 1fe1cff58d99f336406f67f89f53fe6ab3bfde5e
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-19T19:46:59Z

    Changed fractionByKey to a map to enable arg check

commit bd9dc6e08444ec83c5d6adc4c452b49d1ac2b154
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-19T20:00:47Z

    unit bug and style violation fixed

commit 4ad516b14559f06a32d65ef0ce2fa2d7526610bc
Author: Doris Xin <do...@gmail.com>
Date:   2014-06-20T18:38:09Z

    remove unused imports from PairRDDFunctions

commit 254e03c96e1f2aaa5baa9c3d384adeb117e0b7ab
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-03T20:49:46Z

    minor fixes and Java API.
    
    punting on python for now. moved aggregateWithContext out of RDD

commit 6b5b10b71669590f862ec2b7d004ba976d59676c
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-03T20:58:29Z

    Merge branch 'master' into stratified

commit ee9d260e5eb6ee2d6912e57491daeded1704248c
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-06T23:20:39Z

    addressed reviewer comments

commit bbfb8c91a68deae076db7d9fdc518b6cee8a99f1
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-06T23:21:20Z

    Merge branch 'master' into stratified

commit 9884a9f03b18f3170d90d168c246d78fc463028e
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-08T18:58:39Z

    style fix

commit 680b677bc5276e1499c59c7e24abfae7d85e5c7d
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-08T23:49:25Z

    use mapPartitionWithIndex instead
    
    also better readability and lots more comments.

commit a2bf756454b2ae6b71ddc67457c0f69f165b937d
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-08T23:50:17Z

    Merge branch 'master' into stratified

commit a10e68dd9c23b0fc1f25effd9ccd0ac3e7299206
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-09T01:09:14Z

    style fix

commit f4c21f324075bc5e7b8cba07a5c47d23fade542f
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-15T01:34:10Z

    Reviewer comments
    
    Added BernoulliBounds

commit eecee5fb31aa678db872eccbd7385461ea3a6a96
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-15T02:56:58Z

    Merge branch 'master' into stratified
    
    Conflicts:
    	project/SparkBuild.scala

commit b3013a44078f5aa382d986df41af0e4067147077
Author: Xiangrui Meng <me...@databricks.com>
Date:   2014-07-25T05:21:51Z

    move math3 back to test scope

commit b2235297636495061461db14e7a9709afb378bc1
Author: Xiangrui Meng <me...@databricks.com>
Date:   2014-07-25T06:09:34Z

    use approx bounds for poisson
    fix poisson mean for waitlisting
    add unit tests for Java

commit ea7d27f01cf8bbe7e5be9dd19ed25ac0fc1e0a99
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-28T18:41:06Z

    merge master

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51679854
  
    QA results for PR 1866:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18246/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032165
  
    --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---
    @@ -133,68 +133,64 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
        * Return a subset of this RDD sampled by key (via stratified sampling).
        *
        * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    -   *
    -   * If `exact` is set to false, create the sample via simple random sampling, with one pass
    -   * over the RDD, to produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
    -   * the RDD to create a sample size that's exactly equal to the sum of
    +   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
    +   * RDD, to produce a sample of size that's approximately equal to the sum of
        * math.ceil(numItems * samplingRate) over all key values.
        */
       def sampleByKey(withReplacement: Boolean,
           fractions: JMap[K, Double],
    -      exact: Boolean,
           seed: Long): JavaPairRDD[K, V] =
    -    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))
    +    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
     
       /**
        * Return a subset of this RDD sampled by key (via stratified sampling).
        *
        * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    -   *
    -   * If `exact` is set to false, create the sample via simple random sampling, with one pass
    -   * over the RDD, to produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
    -   * the RDD to create a sample size that's exactly equal to the sum of
    +   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
    +   * RDD, to produce a sample of size that's approximately equal to the sum of
        * math.ceil(numItems * samplingRate) over all key values.
        *
    -   * Use Utils.random.nextLong as the default seed for the random number generator
    +   * Use Utils.random.nextLong as the default seed for the random number generator.
        */
       def sampleByKey(withReplacement: Boolean,
    -      fractions: JMap[K, Double],
    -      exact: Boolean): JavaPairRDD[K, V] =
    -    sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)
    +      fractions: JMap[K, Double]): JavaPairRDD[K, V] =
    +    sampleByKey(withReplacement, fractions, Utils.random.nextLong)
     
       /**
    -   * Return a subset of this RDD sampled by key (via stratified sampling).
    +   * ::Experimental::
        *
    -   * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    +   * Return a subset of this RDD sampled by key (via stratified sampling) containing exactly
    +   * math.ceil(numItems * samplingRate) for each stratum (group of pairs with the same key).
        *
    -   * Produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values with one pass over the RDD via
    -   * simple random sampling.
    +   * This method differs from [[sampleByKey]] in that we make additional passes over the RDD to
    +   * create a sample size that's exactly equal to the sum of math.ceil(numItems * samplingRate)
    +   * over all key values with a 99.99% confidence. When sampling without replacement, we need one
    +   * additional pass over the RDD to guarantee sample size; when sampling with replacement, we need
    +   * two additional passes.
        */
    -  def sampleByKey(withReplacement: Boolean,
    +  @Experimental
    +  def sampleByKeyExact(withReplacement: Boolean,
           fractions: JMap[K, Double],
           seed: Long): JavaPairRDD[K, V] =
    -    sampleByKey(withReplacement, fractions, false, seed)
    +    new JavaPairRDD[K, V](rdd.sampleByKeyExact(withReplacement, fractions, seed))
     
       /**
    -   * Return a subset of this RDD sampled by key (via stratified sampling).
    +   * ::Experimental::
        *
    --- End diff --
    
    ditto: remove this line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51730126
  
    QA tests have started for PR 1866. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18289/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51731068
  
    Merged into both master and branch-1.1. Thanks!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032164
  
    --- Diff: core/src/main/scala/org/apache/spark/api/java/JavaPairRDD.scala ---
    @@ -133,68 +133,64 @@ class JavaPairRDD[K, V](val rdd: RDD[(K, V)])
        * Return a subset of this RDD sampled by key (via stratified sampling).
        *
        * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    -   *
    -   * If `exact` is set to false, create the sample via simple random sampling, with one pass
    -   * over the RDD, to produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
    -   * the RDD to create a sample size that's exactly equal to the sum of
    +   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
    +   * RDD, to produce a sample of size that's approximately equal to the sum of
        * math.ceil(numItems * samplingRate) over all key values.
        */
       def sampleByKey(withReplacement: Boolean,
           fractions: JMap[K, Double],
    -      exact: Boolean,
           seed: Long): JavaPairRDD[K, V] =
    -    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, exact, seed))
    +    new JavaPairRDD[K, V](rdd.sampleByKey(withReplacement, fractions, seed))
     
       /**
        * Return a subset of this RDD sampled by key (via stratified sampling).
        *
        * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    -   *
    -   * If `exact` is set to false, create the sample via simple random sampling, with one pass
    -   * over the RDD, to produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values; otherwise, use additional passes over
    -   * the RDD to create a sample size that's exactly equal to the sum of
    +   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
    +   * RDD, to produce a sample of size that's approximately equal to the sum of
        * math.ceil(numItems * samplingRate) over all key values.
        *
    -   * Use Utils.random.nextLong as the default seed for the random number generator
    +   * Use Utils.random.nextLong as the default seed for the random number generator.
        */
       def sampleByKey(withReplacement: Boolean,
    -      fractions: JMap[K, Double],
    -      exact: Boolean): JavaPairRDD[K, V] =
    -    sampleByKey(withReplacement, fractions, exact, Utils.random.nextLong)
    +      fractions: JMap[K, Double]): JavaPairRDD[K, V] =
    +    sampleByKey(withReplacement, fractions, Utils.random.nextLong)
     
       /**
    -   * Return a subset of this RDD sampled by key (via stratified sampling).
    +   * ::Experimental::
        *
    --- End diff --
    
    Please remove this line so both `:: Experimental ::` and the first sentence show up in the summary of the generated doc. Otherwise, only `:: Experimental ::` appears in the summary.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1866


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032172
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -556,6 +519,97 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
         intercept[IllegalArgumentException] {shuffled.lookup(-1)}
       }
     
    +  private object StratifiedAuxiliary {
    +    def stratifier (fractionPositive: Double) = {
    +      (x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
    +    }
    +
    +    def checkSize(exact: Boolean,
    +        withReplacement: Boolean,
    +        expected: Long,
    +        actual: Long,
    +        p: Double): Boolean = {
    +      if (exact) {
    +        return expected == actual
    +      }
    +      val stdev = if (withReplacement) math.sqrt(expected) else math.sqrt(expected * p * (1 - p))
    +      // Very forgiving margin since we're dealing with very small sample sizes most of the time
    +      math.abs(actual - expected) <= 6 * stdev
    +    }
    +
    +    def testSampleExact(stratifiedData: RDD[(String, Int)],
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      testBernoulli(stratifiedData, true, samplingRate, seed, n)
    +      testPoisson(stratifiedData, true, samplingRate, seed, n)
    +    }
    +
    +    def testSample(stratifiedData: RDD[(String, Int)],
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      testBernoulli(stratifiedData, false, samplingRate, seed, n)
    +      testPoisson(stratifiedData, false, samplingRate, seed, n)
    +    }
    +
    +    // Without replacement validation
    +    def testBernoulli(stratifiedData: RDD[(String, Int)],
    +        exact: Boolean,
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      val expectedSampleSize = stratifiedData.countByKey()
    +        .mapValues(count => math.ceil(count * samplingRate).toInt)
    +      val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
    +      val sample = if (exact) {
    +        stratifiedData.sampleByKeyExact(false, fractions, seed)
    +      } else {
    +        stratifiedData.sampleByKey(false, fractions, seed)
    +      }
    +      val sampleCounts = sample.countByKey()
    +      val takeSample = sample.collect()
    +      sampleCounts.foreach { case(k, v) =>
    +        assert(checkSize(exact, false, expectedSampleSize(k), v, samplingRate)) }
    +      assert(takeSample.size === takeSample.toSet.size)
    +      takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
    +    }
    +
    +    // With replacement validation
    +    def testPoisson(stratifiedData: RDD[(String, Int)],
    +        exact: Boolean,
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
    +        math.ceil(count * samplingRate).toInt)
    +      val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
    +      val sample = if (exact) {
    +        stratifiedData.sampleByKeyExact(true, fractions, seed)
    +      } else {
    +        stratifiedData.sampleByKey(true, fractions, seed)
    +      }
    +      val sampleCounts = sample.countByKey()
    +      val takeSample = sample.collect()
    +      sampleCounts.foreach { case (k, v) =>
    +        assert(checkSize(exact, true, expectedSampleSize(k), v, samplingRate)) }
    +      val groupedByKey = takeSample.groupBy(_._1)
    +      for ((key, v) <- groupedByKey) {
    +        if (expectedSampleSize(key) >= 100 && samplingRate >= 0.1) {
    +          // sample large enough for there to be repeats with high likelihood
    +          assert(v.toSet.size < expectedSampleSize(key))
    +        } else {
    +          if (exact) {
    +            assert(v.toSet.size <= expectedSampleSize(key))
    +          } else {
    +            assert(checkSize(false, true, expectedSampleSize(key), v.toSet.size, samplingRate))
    +          }
    +        }
    +      }
    +      takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
    --- End diff --
    
    minor: `takeSample.foreach(x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]"))` We usually use `{ ... }` for pattern matching or multi-line statement.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032168
  
    --- Diff: core/src/test/java/org/apache/spark/JavaAPISuite.java ---
    @@ -1239,12 +1239,28 @@ public void sampleByKey() {
         Assert.assertTrue(worCounts.size() == 2);
         Assert.assertTrue(worCounts.get(0) > 0);
         Assert.assertTrue(worCounts.get(1) > 0);
    -    JavaPairRDD<Integer, Integer> wrExact = rdd2.sampleByKey(true, fractions, true, 1L);
    +  }
    +
    +  @Test
    +  @SuppressWarnings("unchecked")
    +  public void sampleByKeyExact() {
    +    JavaRDD<Integer> rdd1 = sc.parallelize(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8), 3);
    +    JavaPairRDD<Integer, Integer> rdd2 = rdd1.mapToPair(
    +            new PairFunction<Integer, Integer, Integer>() {
    --- End diff --
    
    Using two-space indentation?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032166
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala ---
    @@ -197,33 +197,57 @@ class PairRDDFunctions[K, V](self: RDD[(K, V)])
        * Return a subset of this RDD sampled by key (via stratified sampling).
        *
        * Create a sample of this RDD using variable sampling rates for different keys as specified by
    -   * `fractions`, a key to sampling rate map.
    -   *
    -   * If `exact` is set to false, create the sample via simple random sampling, with one pass
    -   * over the RDD, to produce a sample of size that's approximately equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values; otherwise, use
    -   * additional passes over the RDD to create a sample size that's exactly equal to the sum of
    -   * math.ceil(numItems * samplingRate) over all key values with a 99.99% confidence. When sampling
    -   * without replacement, we need one additional pass over the RDD to guarantee sample size;
    -   * when sampling with replacement, we need two additional passes.
    +   * `fractions`, a key to sampling rate map, via simple random sampling with one pass over the
    +   * RDD, to produce a sample of size that's approximately equal to the sum of
    +   * math.ceil(numItems * samplingRate) over all key values.
        *
        * @param withReplacement whether to sample with or without replacement
        * @param fractions map of specific keys to sampling rates
        * @param seed seed for the random number generator
    -   * @param exact whether sample size needs to be exactly math.ceil(fraction * size) per key
        * @return RDD containing the sampled subset
        */
       def sampleByKey(withReplacement: Boolean,
           fractions: Map[K, Double],
    -      exact: Boolean = false,
    -      seed: Long = Utils.random.nextLong): RDD[(K, V)]= {
    +      seed: Long = Utils.random.nextLong): RDD[(K, V)] = {
    +
    +    require(fractions.values.forall(v => v >= 0.0), "Negative sampling rates.")
    +
    +    val samplingFunc = if (withReplacement) {
    +      StratifiedSamplingUtils.getPoissonSamplingFunction(self, fractions, false, seed)
    +    } else {
    +      StratifiedSamplingUtils.getBernoulliSamplingFunction(self, fractions, false, seed)
    +    }
    +    self.mapPartitionsWithIndex(samplingFunc, preservesPartitioning = true)
    +  }
    +
    +  /**
    +   * ::Experimental::
    +   *
    --- End diff --
    
    ditto: remove this line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51731332
  
    QA results for PR 1866:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds no public classes<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18289/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51678949
  
    QA tests have started for PR 1866. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/18246/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1866#discussion_r16032170
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/PairRDDFunctionsSuite.scala ---
    @@ -556,6 +519,97 @@ class PairRDDFunctionsSuite extends FunSuite with SharedSparkContext {
         intercept[IllegalArgumentException] {shuffled.lookup(-1)}
       }
     
    +  private object StratifiedAuxiliary {
    +    def stratifier (fractionPositive: Double) = {
    +      (x: Int) => if (x % 10 < (10 * fractionPositive).toInt) "1" else "0"
    +    }
    +
    +    def checkSize(exact: Boolean,
    +        withReplacement: Boolean,
    +        expected: Long,
    +        actual: Long,
    +        p: Double): Boolean = {
    +      if (exact) {
    +        return expected == actual
    +      }
    +      val stdev = if (withReplacement) math.sqrt(expected) else math.sqrt(expected * p * (1 - p))
    +      // Very forgiving margin since we're dealing with very small sample sizes most of the time
    +      math.abs(actual - expected) <= 6 * stdev
    +    }
    +
    +    def testSampleExact(stratifiedData: RDD[(String, Int)],
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      testBernoulli(stratifiedData, true, samplingRate, seed, n)
    +      testPoisson(stratifiedData, true, samplingRate, seed, n)
    +    }
    +
    +    def testSample(stratifiedData: RDD[(String, Int)],
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      testBernoulli(stratifiedData, false, samplingRate, seed, n)
    +      testPoisson(stratifiedData, false, samplingRate, seed, n)
    +    }
    +
    +    // Without replacement validation
    +    def testBernoulli(stratifiedData: RDD[(String, Int)],
    +        exact: Boolean,
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      val expectedSampleSize = stratifiedData.countByKey()
    +        .mapValues(count => math.ceil(count * samplingRate).toInt)
    +      val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
    +      val sample = if (exact) {
    +        stratifiedData.sampleByKeyExact(false, fractions, seed)
    +      } else {
    +        stratifiedData.sampleByKey(false, fractions, seed)
    +      }
    +      val sampleCounts = sample.countByKey()
    +      val takeSample = sample.collect()
    +      sampleCounts.foreach { case(k, v) =>
    +        assert(checkSize(exact, false, expectedSampleSize(k), v, samplingRate)) }
    +      assert(takeSample.size === takeSample.toSet.size)
    +      takeSample.foreach { x => assert(1 <= x._2 && x._2 <= n, s"elements not in [1, $n]") }
    +    }
    +
    +    // With replacement validation
    +    def testPoisson(stratifiedData: RDD[(String, Int)],
    +        exact: Boolean,
    +        samplingRate: Double,
    +        seed: Long,
    +        n: Long) = {
    +      val expectedSampleSize = stratifiedData.countByKey().mapValues(count =>
    +        math.ceil(count * samplingRate).toInt)
    +      val fractions = Map("1" -> samplingRate, "0" -> samplingRate)
    +      val sample = if (exact) {
    +        stratifiedData.sampleByKeyExact(true, fractions, seed)
    +      } else {
    +        stratifiedData.sampleByKey(true, fractions, seed)
    +      }
    +      val sampleCounts = sample.countByKey()
    +      val takeSample = sample.collect()
    +      sampleCounts.foreach { case (k, v) =>
    +        assert(checkSize(exact, true, expectedSampleSize(k), v, samplingRate)) }
    --- End diff --
    
    move `}` to next line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51730672
  
    Looks good, I also prefer separating this


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] spark pull request: [SPARK-2937] Separate out samplyByKeyExact as ...

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1866#issuecomment-51720797
  
    LGTM except inline comments. Thanks for keeping APIs consistent across languages!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org