You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/10/31 06:31:02 UTC

git commit: [SPARK-3250] Implement Gap Sampling optimization for random sampling

Repository: spark
Updated Branches:
  refs/heads/master 872fc669b -> ad3bd0dff


[SPARK-3250] Implement Gap Sampling optimization for random sampling

More efficient sampling, based on Gap Sampling optimization:
http://erikerlandson.github.io/blog/2014/09/11/faster-random-samples-with-gap-sampling/

Author: Erik Erlandson <ee...@redhat.com>

Closes #2455 from erikerlandson/spark-3250-pr and squashes the following commits:

72496bc [Erik Erlandson] [SPARK-3250] Implement Gap Sampling optimization for random sampling


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ad3bd0df
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ad3bd0df
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ad3bd0df

Branch: refs/heads/master
Commit: ad3bd0dff8997861c5a04438145ba6f91c57a849
Parents: 872fc66
Author: Erik Erlandson <ee...@redhat.com>
Authored: Thu Oct 30 22:30:52 2014 -0700
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Oct 30 22:30:52 2014 -0700

----------------------------------------------------------------------
 .../main/scala/org/apache/spark/rdd/RDD.scala   |   6 +-
 .../spark/util/random/RandomSampler.scala       | 286 ++++++++-
 .../java/org/apache/spark/JavaAPISuite.java     |   9 +-
 .../spark/util/random/RandomSamplerSuite.scala  | 606 ++++++++++++++++---
 .../org/apache/spark/mllib/util/MLUtils.scala   |   4 +-
 5 files changed, 790 insertions(+), 121 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index b7f125d..c169b2d 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -43,7 +43,8 @@ import org.apache.spark.partial.PartialResult
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.util.{BoundedPriorityQueue, Utils, CallSite}
 import org.apache.spark.util.collection.OpenHashMap
-import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, SamplingUtils}
+import org.apache.spark.util.random.{BernoulliSampler, PoissonSampler, BernoulliCellSampler,
+  SamplingUtils}
 
 /**
  * A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,
@@ -375,7 +376,8 @@ abstract class RDD[T: ClassTag](
     val sum = weights.sum
     val normalizedCumWeights = weights.map(_ / sum).scanLeft(0.0d)(_ + _)
     normalizedCumWeights.sliding(2).map { x =>
-      new PartitionwiseSampledRDD[T, T](this, new BernoulliSampler[T](x(0), x(1)), true, seed)
+      new PartitionwiseSampledRDD[T, T](
+        this, new BernoulliCellSampler[T](x(0), x(1)), true, seed)
     }.toArray
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
index ee389de..76e7a27 100644
--- a/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/RandomSampler.scala
@@ -19,6 +19,9 @@ package org.apache.spark.util.random
 
 import java.util.Random
 
+import scala.reflect.ClassTag
+import scala.collection.mutable.ArrayBuffer
+
 import org.apache.commons.math3.distribution.PoissonDistribution
 
 import org.apache.spark.annotation.DeveloperApi
@@ -38,13 +41,47 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
   /** take a random sample */
   def sample(items: Iterator[T]): Iterator[U]
 
+  /** return a copy of the RandomSampler object */
   override def clone: RandomSampler[T, U] =
     throw new NotImplementedError("clone() is not implemented.")
 }
 
+private[spark]
+object RandomSampler {
+  /** Default random number generator used by random samplers. */
+  def newDefaultRNG: Random = new XORShiftRandom
+
+  /**
+   * Default maximum gap-sampling fraction.
+   * For sampling fractions <= this value, the gap sampling optimization will be applied.
+   * Above this value, it is assumed that "tradtional" Bernoulli sampling is faster.  The
+   * optimal value for this will depend on the RNG.  More expensive RNGs will tend to make
+   * the optimal value higher.  The most reliable way to determine this value for a new RNG
+   * is to experiment.  When tuning for a new RNG, I would expect a value of 0.5 to be close
+   * in most cases, as an initial guess.
+   */
+  val defaultMaxGapSamplingFraction = 0.4
+
+  /**
+   * Default epsilon for floating point numbers sampled from the RNG.
+   * The gap-sampling compute logic requires taking log(x), where x is sampled from an RNG.
+   * To guard against errors from taking log(0), a positive epsilon lower bound is applied.
+   * A good value for this parameter is at or near the minimum positive floating
+   * point value returned by "nextDouble()" (or equivalent), for the RNG being used.
+   */
+  val rngEpsilon = 5e-11
+
+  /**
+   * Sampling fraction arguments may be results of computation, and subject to floating
+   * point jitter.  I check the arguments with this epsilon slop factor to prevent spurious
+   * warnings for cases such as summing some numbers to get a sampling fraction of 1.000000001
+   */
+  val roundingEpsilon = 1e-6
+}
+
 /**
  * :: DeveloperApi ::
- * A sampler based on Bernoulli trials.
+ * A sampler based on Bernoulli trials for partitioning a data sequence.
  *
  * @param lb lower bound of the acceptance range
  * @param ub upper bound of the acceptance range
@@ -52,57 +89,262 @@ trait RandomSampler[T, U] extends Pseudorandom with Cloneable with Serializable
  * @tparam T item type
  */
 @DeveloperApi
-class BernoulliSampler[T](lb: Double, ub: Double, complement: Boolean = false)
+class BernoulliCellSampler[T](lb: Double, ub: Double, complement: Boolean = false)
   extends RandomSampler[T, T] {
 
-  private[random] var rng: Random = new XORShiftRandom
+  /** epsilon slop to avoid failure from floating point jitter. */
+  require(
+    lb <= (ub + RandomSampler.roundingEpsilon),
+    s"Lower bound ($lb) must be <= upper bound ($ub)")
+  require(
+    lb >= (0.0 - RandomSampler.roundingEpsilon),
+    s"Lower bound ($lb) must be >= 0.0")
+  require(
+    ub <= (1.0 + RandomSampler.roundingEpsilon),
+    s"Upper bound ($ub) must be <= 1.0")
 
-  def this(ratio: Double) = this(0.0d, ratio)
+  private val rng: Random = new XORShiftRandom
 
   override def setSeed(seed: Long) = rng.setSeed(seed)
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-    items.filter { item =>
-      val x = rng.nextDouble()
-      (x >= lb && x < ub) ^ complement
+    if (ub - lb <= 0.0) {
+      if (complement) items else Iterator.empty
+    } else {
+      if (complement) {
+        items.filter { item => {
+          val x = rng.nextDouble()
+          (x < lb) || (x >= ub)
+        }}
+      } else {
+        items.filter { item => {
+          val x = rng.nextDouble()
+          (x >= lb) && (x < ub)
+        }}
+      }
     }
   }
 
   /**
    *  Return a sampler that is the complement of the range specified of the current sampler.
    */
-  def cloneComplement(): BernoulliSampler[T] = new BernoulliSampler[T](lb, ub, !complement)
+  def cloneComplement(): BernoulliCellSampler[T] =
+    new BernoulliCellSampler[T](lb, ub, !complement)
+
+  override def clone = new BernoulliCellSampler[T](lb, ub, complement)
+}
+
+
+/**
+ * :: DeveloperApi ::
+ * A sampler based on Bernoulli trials.
+ *
+ * @param fraction the sampling fraction, aka Bernoulli sampling probability
+ * @tparam T item type
+ */
+@DeveloperApi
+class BernoulliSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
+
+  /** epsilon slop to avoid failure from floating point jitter */
+  require(
+    fraction >= (0.0 - RandomSampler.roundingEpsilon)
+      && fraction <= (1.0 + RandomSampler.roundingEpsilon),
+    s"Sampling fraction ($fraction) must be on interval [0, 1]")
 
-  override def clone = new BernoulliSampler[T](lb, ub, complement)
+  private val rng: Random = RandomSampler.newDefaultRNG
+
+  override def setSeed(seed: Long) = rng.setSeed(seed)
+
+  override def sample(items: Iterator[T]): Iterator[T] = {
+    if (fraction <= 0.0) {
+      Iterator.empty
+    } else if (fraction >= 1.0) {
+      items
+    } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
+      new GapSamplingIterator(items, fraction, rng, RandomSampler.rngEpsilon)
+    } else {
+      items.filter { _ => rng.nextDouble() <= fraction }
+    }
+  }
+
+  override def clone = new BernoulliSampler[T](fraction)
 }
 
+
 /**
  * :: DeveloperApi ::
- * A sampler based on values drawn from Poisson distribution.
+ * A sampler for sampling with replacement, based on values drawn from Poisson distribution.
  *
- * @param mean Poisson mean
+ * @param fraction the sampling fraction (with replacement)
  * @tparam T item type
  */
 @DeveloperApi
-class PoissonSampler[T](mean: Double) extends RandomSampler[T, T] {
+class PoissonSampler[T: ClassTag](fraction: Double) extends RandomSampler[T, T] {
+
+  /** Epsilon slop to avoid failure from floating point jitter. */
+  require(
+    fraction >= (0.0 - RandomSampler.roundingEpsilon),
+    s"Sampling fraction ($fraction) must be >= 0")
 
-  private[random] var rng = new PoissonDistribution(mean)
+  // PoissonDistribution throws an exception when fraction <= 0
+  // If fraction is <= 0, Iterator.empty is used below, so we can use any placeholder value.
+  private val rng = new PoissonDistribution(if (fraction > 0.0) fraction else 1.0)
+  private val rngGap = RandomSampler.newDefaultRNG
 
   override def setSeed(seed: Long) {
-    rng = new PoissonDistribution(mean)
     rng.reseedRandomGenerator(seed)
+    rngGap.setSeed(seed)
   }
 
   override def sample(items: Iterator[T]): Iterator[T] = {
-    items.flatMap { item =>
-      val count = rng.sample()
-      if (count == 0) {
-        Iterator.empty
-      } else {
-        Iterator.fill(count)(item)
-      }
+    if (fraction <= 0.0) {
+      Iterator.empty
+    } else if (fraction <= RandomSampler.defaultMaxGapSamplingFraction) {
+        new GapSamplingReplacementIterator(items, fraction, rngGap, RandomSampler.rngEpsilon)
+    } else {
+      items.flatMap { item => {
+        val count = rng.sample()
+        if (count == 0) Iterator.empty else Iterator.fill(count)(item)
+      }}
+    }
+  }
+
+  override def clone = new PoissonSampler[T](fraction)
+}
+
+
+private[spark]
+class GapSamplingIterator[T: ClassTag](
+    var data: Iterator[T],
+    f: Double,
+    rng: Random = RandomSampler.newDefaultRNG,
+    epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] {
+
+  require(f > 0.0  &&  f < 1.0, s"Sampling fraction ($f) must reside on open interval (0, 1)")
+  require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
+
+  /** implement efficient linear-sequence drop until Scala includes fix for jira SI-8835. */
+  private val iterDrop: Int => Unit = {
+    val arrayClass = Array.empty[T].iterator.getClass
+    val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
+    data.getClass match {
+      case `arrayClass` => ((n: Int) => { data = data.drop(n) })
+      case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
+      case _ => ((n: Int) => {
+          var j = 0
+          while (j < n && data.hasNext) {
+            data.next()
+            j += 1
+          }
+        })
+    }
+  }
+
+  override def hasNext: Boolean = data.hasNext
+
+  override def next(): T = {
+    val r = data.next()
+    advance
+    r
+  }
+
+  private val lnq = math.log1p(-f)
+
+  /** skip elements that won't be sampled, according to geometric dist P(k) = (f)(1-f)^k. */
+  private def advance: Unit = {
+    val u = math.max(rng.nextDouble(), epsilon)
+    val k = (math.log(u) / lnq).toInt
+    iterDrop(k)
+  }
+
+  /** advance to first sample as part of object construction. */
+  advance
+  // Attempting to invoke this closer to the top with other object initialization
+  // was causing it to break in strange ways, so I'm invoking it last, which seems to
+  // work reliably.
+}
+
+private[spark]
+class GapSamplingReplacementIterator[T: ClassTag](
+    var data: Iterator[T],
+    f: Double,
+    rng: Random = RandomSampler.newDefaultRNG,
+    epsilon: Double = RandomSampler.rngEpsilon) extends Iterator[T] {
+
+  require(f > 0.0, s"Sampling fraction ($f) must be > 0")
+  require(epsilon > 0.0, s"epsilon ($epsilon) must be > 0")
+
+  /** implement efficient linear-sequence drop until scala includes fix for jira SI-8835. */
+  private val iterDrop: Int => Unit = {
+    val arrayClass = Array.empty[T].iterator.getClass
+    val arrayBufferClass = ArrayBuffer.empty[T].iterator.getClass
+    data.getClass match {
+      case `arrayClass` => ((n: Int) => { data = data.drop(n) })
+      case `arrayBufferClass` => ((n: Int) => { data = data.drop(n) })
+      case _ => ((n: Int) => {
+          var j = 0
+          while (j < n && data.hasNext) {
+            data.next()
+            j += 1
+          }
+        })
+    }
+  }
+
+  /** current sampling value, and its replication factor, as we are sampling with replacement. */
+  private var v: T = _
+  private var rep: Int = 0
+
+  override def hasNext: Boolean = data.hasNext || rep > 0
+
+  override def next(): T = {
+    val r = v
+    rep -= 1
+    if (rep <= 0) advance
+    r
+  }
+
+  /**
+   * Skip elements with replication factor zero (i.e. elements that won't be sampled).
+   * Samples 'k' from geometric distribution  P(k) = (1-q)(q)^k, where q = e^(-f), that is
+   * q is the probabililty of Poisson(0; f)
+   */
+  private def advance: Unit = {
+    val u = math.max(rng.nextDouble(), epsilon)
+    val k = (math.log(u) / (-f)).toInt
+    iterDrop(k)
+    // set the value and replication factor for the next value
+    if (data.hasNext) {
+      v = data.next()
+      rep = poissonGE1
+    }
+  }
+
+  private val q = math.exp(-f)
+
+  /**
+   * Sample from Poisson distribution, conditioned such that the sampled value is >= 1.
+   * This is an adaptation from the algorithm for Generating Poisson distributed random variables:
+   * http://en.wikipedia.org/wiki/Poisson_distribution
+   */
+  private def poissonGE1: Int = {
+    // simulate that the standard poisson sampling
+    // gave us at least one iteration, for a sample of >= 1
+    var pp = q + ((1.0 - q) * rng.nextDouble())
+    var r = 1
+
+    // now continue with standard poisson sampling algorithm
+    pp *= rng.nextDouble()
+    while (pp > q) {
+      r += 1
+      pp *= rng.nextDouble()
     }
+    r
   }
 
-  override def clone = new PoissonSampler[T](mean)
+  /** advance to first sample as part of object construction. */
+  advance
+  // Attempting to invoke this closer to the top with other object initialization
+  // was causing it to break in strange ways, so I'm invoking it last, which seems to
+  // work reliably.
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/test/java/org/apache/spark/JavaAPISuite.java
----------------------------------------------------------------------
diff --git a/core/src/test/java/org/apache/spark/JavaAPISuite.java b/core/src/test/java/org/apache/spark/JavaAPISuite.java
index 0172876..c21a4b3 100644
--- a/core/src/test/java/org/apache/spark/JavaAPISuite.java
+++ b/core/src/test/java/org/apache/spark/JavaAPISuite.java
@@ -140,11 +140,10 @@ public class JavaAPISuite implements Serializable {
   public void sample() {
     List<Integer> ints = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
     JavaRDD<Integer> rdd = sc.parallelize(ints);
-    JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 11);
-    // expected 2 but of course result varies randomly a bit
-    Assert.assertEquals(1, sample20.count());
-    JavaRDD<Integer> sample20NoReplacement = rdd.sample(false, 0.2, 11);
-    Assert.assertEquals(2, sample20NoReplacement.count());
+    JavaRDD<Integer> sample20 = rdd.sample(true, 0.2, 3);
+    Assert.assertEquals(2, sample20.count());
+    JavaRDD<Integer> sample20WithoutReplacement = rdd.sample(false, 0.2, 5);
+    Assert.assertEquals(2, sample20WithoutReplacement.count());
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
index ba67d76..20944b6 100644
--- a/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
+++ b/core/src/test/scala/org/apache/spark/util/random/RandomSamplerSuite.scala
@@ -18,97 +18,523 @@
 package org.apache.spark.util.random
 
 import java.util.Random
-
+import scala.collection.mutable.ArrayBuffer
 import org.apache.commons.math3.distribution.PoissonDistribution
 
-import org.scalatest.{BeforeAndAfter, FunSuite}
-import org.scalatest.mock.EasyMockSugar
-
-class RandomSamplerSuite extends FunSuite with BeforeAndAfter with EasyMockSugar {
-
-  val a = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
-
-  var random: Random = _
-  var poisson: PoissonDistribution = _
-
-  before {
-    random = mock[Random]
-    poisson = mock[PoissonDistribution]
-  }
-
-  test("BernoulliSamplerWithRange") {
-    expecting {
-      for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
-        random.nextDouble().andReturn(x)
-      }
-    }
-    whenExecuting(random) {
-      val sampler = new BernoulliSampler[Int](0.25, 0.55)
-      sampler.rng = random
-      assert(sampler.sample(a.iterator).toList == List(3, 4, 5))
-    }
-  }
-
-  test("BernoulliSamplerWithRangeInverse") {
-    expecting {
-      for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
-        random.nextDouble().andReturn(x)
-      }
-    }
-    whenExecuting(random) {
-      val sampler = new BernoulliSampler[Int](0.25, 0.55, true)
-      sampler.rng = random
-      assert(sampler.sample(a.iterator).toList === List(1, 2, 6, 7, 8, 9))
-    }
-  }
-
-  test("BernoulliSamplerWithRatio") {
-    expecting {
-      for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
-        random.nextDouble().andReturn(x)
-      }
-    }
-    whenExecuting(random) {
-      val sampler = new BernoulliSampler[Int](0.35)
-      sampler.rng = random
-      assert(sampler.sample(a.iterator).toList == List(1, 2, 3))
-    }
-  }
-
-  test("BernoulliSamplerWithComplement") {
-    expecting {
-      for(x <- Seq(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9)) {
-        random.nextDouble().andReturn(x)
-      }
-    }
-    whenExecuting(random) {
-      val sampler = new BernoulliSampler[Int](0.25, 0.55, true)
-      sampler.rng = random
-      assert(sampler.sample(a.iterator).toList == List(1, 2, 6, 7, 8, 9))
-    }
-  }
-
-  test("BernoulliSamplerSetSeed") {
-    expecting {
-      random.setSeed(10L)
-    }
-    whenExecuting(random) {
-      val sampler = new BernoulliSampler[Int](0.2)
-      sampler.rng = random
-      sampler.setSeed(10L)
-    }
-  }
-
-  test("PoissonSampler") {
-    expecting {
-      for(x <- Seq(0, 1, 2, 0, 1, 1, 0, 0, 0)) {
-        poisson.sample().andReturn(x)
-      }
-    }
-    whenExecuting(poisson) {
-      val sampler = new PoissonSampler[Int](0.2)
-      sampler.rng = poisson
-      assert(sampler.sample(a.iterator).toList == List(2, 3, 3, 5, 6))
-    }
+import org.scalatest.{FunSuite, Matchers}
+
+class RandomSamplerSuite extends FunSuite with Matchers {
+  /**
+   * My statistical testing methodology is to run a Kolmogorov-Smirnov (KS) test
+   * between the random samplers and simple reference samplers (known to work correctly).
+   * The sampling gap sizes between chosen samples should show up as having the same
+   * distributions between test and reference, if things are working properly.  That is,
+   * the KS test will fail to strongly reject the null hypothesis that the distributions of
+   * sampling gaps are the same.
+   * There are no actual KS tests implemented for scala (that I can find) - and so what I
+   * have done here is pre-compute "D" - the KS statistic - that corresponds to a "weak"
+   * p-value for a particular sample size.  I can then test that my measured KS stats
+   * are less than D.  Computing D-values is easy, and implemented below.
+   *
+   * I used the scipy 'kstwobign' distribution to pre-compute my D value:
+   *
+   * def ksdval(q=0.1, n=1000):
+   *     en = np.sqrt(float(n) / 2.0)
+   *     return stats.kstwobign.isf(float(q)) / (en + 0.12 + 0.11 / en)
+   *
+   * When comparing KS stats I take the median of a small number of independent test runs
+   * to compensate for the issue that any sampled statistic will show "false positive" with
+   * some probability.  Even when two distributions are the same, they will register as
+   * different 10% of the time at a p-value of 0.1
+   */
+
+  // This D value is the precomputed KS statistic for p-value 0.1, sample size 1000:
+  val sampleSize = 1000
+  val D = 0.0544280747619
+
+  // I'm not a big fan of fixing seeds, but unit testing based on running statistical tests
+  // will always fail with some nonzero probability, so I'll fix the seed to prevent these
+  // tests from generating random failure noise in CI testing, etc.
+  val rngSeed: Random = RandomSampler.newDefaultRNG
+  rngSeed.setSeed(235711)
+
+  // Reference implementation of sampling without replacement (bernoulli)
+  def sample[T](data: Iterator[T], f: Double): Iterator[T] = {
+    val rng: Random = RandomSampler.newDefaultRNG
+    rng.setSeed(rngSeed.nextLong)
+    data.filter(_ => (rng.nextDouble <= f))
+  }
+
+  // Reference implementation of sampling with replacement
+  def sampleWR[T](data: Iterator[T], f: Double): Iterator[T] = {
+    val rng = new PoissonDistribution(f)
+    rng.reseedRandomGenerator(rngSeed.nextLong)
+    data.flatMap { v => {
+      val rep = rng.sample()
+      if (rep == 0) Iterator.empty else Iterator.fill(rep)(v)
+    }}
+  }
+
+  // Returns iterator over gap lengths between samples.
+  // This function assumes input data is integers sampled from the sequence of 
+  // increasing integers: {0, 1, 2, ...}.  This works because that is how I generate them,
+  // and the samplers preserve their input order
+  def gaps(data: Iterator[Int]): Iterator[Int] = {
+    data.sliding(2).withPartial(false).map { x => x(1) - x(0) }
+  }
+
+  // Returns the cumulative distribution from a histogram
+  def cumulativeDist(hist: Array[Int]): Array[Double] = {
+    val n = hist.sum.toDouble
+    assert(n > 0.0)
+    hist.scanLeft(0)(_ + _).drop(1).map { _.toDouble / n }
+  }
+
+  // Returns aligned cumulative distributions from two arrays of data
+  def cumulants(d1: Array[Int], d2: Array[Int],
+      ss: Int = sampleSize): (Array[Double], Array[Double]) = {
+    assert(math.min(d1.length, d2.length) > 0)
+    assert(math.min(d1.min, d2.min)  >=  0)
+    val m = 1 + math.max(d1.max, d2.max)
+    val h1 = Array.fill[Int](m)(0)
+    val h2 = Array.fill[Int](m)(0)
+    for (v <- d1) { h1(v) += 1 }
+    for (v <- d2) { h2(v) += 1 }
+    assert(h1.sum == h2.sum)
+    assert(h1.sum == ss)
+    (cumulativeDist(h1), cumulativeDist(h2))
+  }
+
+  // Computes the Kolmogorov-Smirnov 'D' statistic from two cumulative distributions
+  def KSD(cdf1: Array[Double], cdf2: Array[Double]): Double = {
+    assert(cdf1.length == cdf2.length)
+    val n = cdf1.length
+    assert(n > 0)
+    assert(cdf1(n-1) == 1.0)
+    assert(cdf2(n-1) == 1.0)
+    cdf1.zip(cdf2).map { x => Math.abs(x._1 - x._2) }.max
+  }
+
+  // Returns the median KS 'D' statistic between two samples, over (m) sampling trials
+  def medianKSD(data1: => Iterator[Int], data2: => Iterator[Int], m: Int = 5): Double = {
+    val t = Array.fill[Double](m) {
+      val (c1, c2) = cumulants(data1.take(sampleSize).toArray,
+                               data2.take(sampleSize).toArray)
+      KSD(c1, c2)
+    }.sorted
+    // return the median KS statistic
+    t(m / 2)
+  }
+
+  test("utilities") {
+    val s1 = Array(0, 1, 1, 0, 2)
+    val s2 = Array(1, 0, 3, 2, 1)
+    val (c1, c2) = cumulants(s1, s2, ss = 5)
+    c1 should be (Array(0.4, 0.8, 1.0, 1.0))
+    c2 should be (Array(0.2, 0.6, 0.8, 1.0))
+    KSD(c1, c2) should be (0.2 +- 0.000001)
+    KSD(c2, c1) should be (KSD(c1, c2))
+    gaps(List(0, 1, 1, 2, 4, 11).iterator).toArray should be (Array(1, 0, 1, 2, 7))
+  }
+
+  test("sanity check medianKSD against references") {
+    var d: Double = 0.0
+
+    // should be statistically same, i.e. fail to reject null hypothesis strongly
+    d = medianKSD(gaps(sample(Iterator.from(0), 0.5)), gaps(sample(Iterator.from(0), 0.5)))
+    d should be < D
+
+    // should be statistically different - null hypothesis will have high D value,
+    // corresponding to low p-value that rejects the null hypothesis
+    d = medianKSD(gaps(sample(Iterator.from(0), 0.4)), gaps(sample(Iterator.from(0), 0.5)))
+    d should be > D
+
+    // same!
+    d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.5)))
+    d should be < D
+
+    // different!
+    d = medianKSD(gaps(sampleWR(Iterator.from(0), 0.5)), gaps(sampleWR(Iterator.from(0), 0.6)))
+    d should be > D
+  }
+
+  test("bernoulli sampling") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+
+    var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.5)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.5)))
+    d should be < D
+
+    sampler = new BernoulliSampler[Int](0.7)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.7)))
+    d should be < D
+
+    sampler = new BernoulliSampler[Int](0.9)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+    d should be < D
+
+    // sampling at different frequencies should show up as statistically different:
+    sampler = new BernoulliSampler[Int](0.5)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.6)))
+    d should be > D
+  }
+
+  test("bernoulli sampling with gap sampling optimization") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+
+    var sampler: RandomSampler[Int, Int] = new BernoulliSampler[Int](0.01)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.01)))
+    d should be < D
+
+    sampler = new BernoulliSampler[Int](0.1)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+
+    sampler = new BernoulliSampler[Int](0.3)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.3)))
+    d should be < D
+
+    // sampling at different frequencies should show up as statistically different:
+    sampler = new BernoulliSampler[Int](0.3)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.4)))
+    d should be > D
+  }
+
+  test("bernoulli boundary cases") {
+    val data = (1 to 100).toArray
+
+    var sampler = new BernoulliSampler[Int](0.0)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliSampler[Int](1.0)
+    sampler.sample(data.iterator).toArray should be (data)
+
+    sampler = new BernoulliSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliSampler[Int](1.0 + (RandomSampler.roundingEpsilon / 2.0))
+    sampler.sample(data.iterator).toArray should be (data)
+  }
+
+  test("bernoulli data types") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+    var sampler = new BernoulliSampler[Int](0.1)
+    sampler.setSeed(rngSeed.nextLong)
+
+    // Array iterator (indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)),
+      gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+
+    // ArrayBuffer iterator (indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+      gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+
+    // List iterator (non-indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)),
+      gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+  }
+
+  test("bernoulli clone") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d = 0.0
+    var sampler = new BernoulliSampler[Int](0.1).clone
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+
+    sampler = new BernoulliSampler[Int](0.9).clone
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+    d should be < D
+  }
+
+  test("bernoulli set seed") {
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+    var sampler1 = new BernoulliSampler[Int](0.2)
+    var sampler2 = new BernoulliSampler[Int](0.2)
+
+    // distributions should be identical if seeds are set same
+    sampler1.setSeed(73)
+    sampler2.setSeed(73)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be (0.0)
+
+    // should be different for different seeds
+    sampler1.setSeed(73)
+    sampler2.setSeed(37)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be > 0.0
+    d should be < D
+
+    sampler1 = new BernoulliSampler[Int](0.8)
+    sampler2 = new BernoulliSampler[Int](0.8)
+
+    // distributions should be identical if seeds are set same
+    sampler1.setSeed(73)
+    sampler2.setSeed(73)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be (0.0)
+
+    // should be different for different seeds
+    sampler1.setSeed(73)
+    sampler2.setSeed(37)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be > 0.0
+    d should be < D
+  }
+
+  test("replacement sampling") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+
+    var sampler = new PoissonSampler[Int](0.5)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.5)))
+    d should be < D
+
+    sampler = new PoissonSampler[Int](0.7)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.7)))
+    d should be < D
+
+    sampler = new PoissonSampler[Int](0.9)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9)))
+    d should be < D
+
+    // sampling at different frequencies should show up as statistically different:
+    sampler = new PoissonSampler[Int](0.5)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.6)))
+    d should be > D
+  }
+
+  test("replacement sampling with gap sampling") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+
+    var sampler = new PoissonSampler[Int](0.01)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.01)))
+    d should be < D
+
+    sampler = new PoissonSampler[Int](0.1)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1)))
+    d should be < D
+
+    sampler = new PoissonSampler[Int](0.3)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.3)))
+    d should be < D
+
+    // sampling at different frequencies should show up as statistically different:
+    sampler = new PoissonSampler[Int](0.3)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.4)))
+    d should be > D
+  }
+
+  test("replacement boundary cases") {
+    val data = (1 to 100).toArray
+
+    var sampler = new PoissonSampler[Int](0.0)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new PoissonSampler[Int](0.0 - (RandomSampler.roundingEpsilon / 2.0))
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    // sampling with replacement has no upper bound on sampling fraction
+    sampler = new PoissonSampler[Int](2.0)
+    sampler.sample(data.iterator).length should be > (data.length)
+  }
+
+  test("replacement data types") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+    var sampler = new PoissonSampler[Int](0.1)
+    sampler.setSeed(rngSeed.nextLong)
+
+    // Array iterator (indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toArray.iterator)),
+      gaps(sampleWR(Iterator.from(0), 0.1)))
+    d should be < D
+
+    // ArrayBuffer iterator (indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).to[ArrayBuffer].iterator)),
+      gaps(sampleWR(Iterator.from(0), 0.1)))
+    d should be < D
+
+    // List iterator (non-indexable type)
+    d = medianKSD(
+      gaps(sampler.sample(Iterator.from(0).take(20*sampleSize).toList.iterator)),
+      gaps(sampleWR(Iterator.from(0), 0.1)))
+    d should be < D
+  }
+
+  test("replacement clone") {
+    // Tests expect maximum gap sampling fraction to be this value
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d = 0.0
+    var sampler = new PoissonSampler[Int](0.1).clone
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.1)))
+    d should be < D
+
+    sampler = new PoissonSampler[Int](0.9).clone
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sampleWR(Iterator.from(0), 0.9)))
+    d should be < D
+  }
+
+  test("replacement set seed") {
+    RandomSampler.defaultMaxGapSamplingFraction should be (0.4)
+
+    var d: Double = 0.0
+    var sampler1 = new PoissonSampler[Int](0.2)
+    var sampler2 = new PoissonSampler[Int](0.2)
+
+    // distributions should be identical if seeds are set same
+    sampler1.setSeed(73)
+    sampler2.setSeed(73)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be (0.0)
+
+    // should be different for different seeds
+    sampler1.setSeed(73)
+    sampler2.setSeed(37)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be > 0.0
+    d should be < D
+
+    sampler1 = new PoissonSampler[Int](0.8)
+    sampler2 = new PoissonSampler[Int](0.8)
+
+    // distributions should be identical if seeds are set same
+    sampler1.setSeed(73)
+    sampler2.setSeed(73)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be (0.0)
+
+    // should be different for different seeds
+    sampler1.setSeed(73)
+    sampler2.setSeed(37)
+    d = medianKSD(gaps(sampler1.sample(Iterator.from(0))), gaps(sampler2.sample(Iterator.from(0))))
+    d should be > 0.0
+    d should be < D
+  }
+
+  test("bernoulli partitioning sampling") {
+    var d: Double = 0.0
+
+    var sampler = new BernoulliCellSampler[Int](0.1, 0.2)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.1)))
+    d should be < D
+
+    sampler = new BernoulliCellSampler[Int](0.1, 0.2, true)
+    sampler.setSeed(rngSeed.nextLong)
+    d = medianKSD(gaps(sampler.sample(Iterator.from(0))), gaps(sample(Iterator.from(0), 0.9)))
+    d should be < D
+  }
+
+  test("bernoulli partitioning boundary cases") {
+    val data = (1 to 100).toArray
+    val d = RandomSampler.roundingEpsilon / 2.0
+
+    var sampler = new BernoulliCellSampler[Int](0.0, 0.0)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliCellSampler[Int](0.5, 0.5)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliCellSampler[Int](1.0, 1.0)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliCellSampler[Int](0.0, 1.0)
+    sampler.sample(data.iterator).toArray should be (data)
+
+    sampler = new BernoulliCellSampler[Int](0.0 - d, 1.0 + d)
+    sampler.sample(data.iterator).toArray should be (data)
+
+    sampler = new BernoulliCellSampler[Int](0.5, 0.5 - d)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+  }
+
+  test("bernoulli partitioning data") {
+    val seed = rngSeed.nextLong
+    val data = (1 to 100).toArray
+
+    var sampler = new BernoulliCellSampler[Int](0.4, 0.6)
+    sampler.setSeed(seed)
+    val s1 = sampler.sample(data.iterator).toArray
+    s1.length should be > 0
+
+    sampler = new BernoulliCellSampler[Int](0.4, 0.6, true)
+    sampler.setSeed(seed)
+    val s2 = sampler.sample(data.iterator).toArray
+    s2.length should be > 0
+
+    (s1 ++ s2).sorted should be (data)
+
+    sampler = new BernoulliCellSampler[Int](0.5, 0.5)
+    sampler.sample(data.iterator).toArray should be (Array.empty[Int])
+
+    sampler = new BernoulliCellSampler[Int](0.5, 0.5, true)
+    sampler.sample(data.iterator).toArray should be (data)
+  }
+
+  test("bernoulli partitioning clone") {
+    val seed = rngSeed.nextLong
+    val data = (1 to 100).toArray
+    val base = new BernoulliCellSampler[Int](0.35, 0.65)
+
+    var sampler = base.clone
+    sampler.setSeed(seed)
+    val s1 = sampler.sample(data.iterator).toArray
+    s1.length should be > 0
+
+    sampler = base.cloneComplement
+    sampler.setSeed(seed)
+    val s2 = sampler.sample(data.iterator).toArray
+    s2.length should be > 0
+
+    (s1 ++ s2).sorted should be (data)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ad3bd0df/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
index b88e08b..9353351 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/util/MLUtils.scala
@@ -26,7 +26,7 @@ import org.apache.spark.annotation.Experimental
 import org.apache.spark.SparkContext
 import org.apache.spark.rdd.RDD
 import org.apache.spark.rdd.PartitionwiseSampledRDD
-import org.apache.spark.util.random.BernoulliSampler
+import org.apache.spark.util.random.BernoulliCellSampler
 import org.apache.spark.mllib.regression.LabeledPoint
 import org.apache.spark.mllib.linalg.{Vector, Vectors}
 import org.apache.spark.storage.StorageLevel
@@ -244,7 +244,7 @@ object MLUtils {
   def kFold[T: ClassTag](rdd: RDD[T], numFolds: Int, seed: Int): Array[(RDD[T], RDD[T])] = {
     val numFoldsF = numFolds.toFloat
     (1 to numFolds).map { fold =>
-      val sampler = new BernoulliSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
+      val sampler = new BernoulliCellSampler[T]((fold - 1) / numFoldsF, fold / numFoldsF,
         complement = false)
       val validation = new PartitionwiseSampledRDD(rdd, sampler, true, seed)
       val training = new PartitionwiseSampledRDD(rdd, sampler.cloneComplement(), true, seed)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org