You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by me...@apache.org on 2014/12/19 06:00:53 UTC

spark git commit: [SPARK-4728][MLLib] Add exponential, gamma, and log normal sampling to MLlib da...

Repository: spark
Updated Branches:
  refs/heads/master c3d91da5e -> ee1fb97a9


[SPARK-4728][MLLib] Add exponential, gamma, and log normal sampling to MLlib da...

...ta generators

This patch adds:

* Exponential, gamma, and log normal generators that wrap Apache Commons math3 to the private API
* Functions for generating exponential, gamma, and log normal RDDs and vector RDDs
* Tests for the above

Author: RJ Nowling <rn...@gmail.com>

Closes #3680 from rnowling/spark4728 and squashes the following commits:

455f50a [RJ Nowling] Add tests for exponential, gamma, and log normal samplers to JavaRandomRDDsSuite
3e1134a [RJ Nowling] Fix val/var, unncessary creation of Distribution objects when setting seeds, and import line longer than line wrap limits
58f5b97 [RJ Nowling] Fix bounds in tests so they scale with variance, not stdev
84fd98d [RJ Nowling] Add more values for testing distributions.
9f96232 [RJ Nowling] [SPARK-4728] Add exponential, gamma, and log normal sampling to MLlib data generators


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/ee1fb97a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/ee1fb97a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/ee1fb97a

Branch: refs/heads/master
Commit: ee1fb97a97d5ac18cd2ad8028e84ecbd988fb811
Parents: c3d91da
Author: RJ Nowling <rn...@gmail.com>
Authored: Thu Dec 18 21:00:49 2014 -0800
Committer: Xiangrui Meng <me...@databricks.com>
Committed: Thu Dec 18 21:00:49 2014 -0800

----------------------------------------------------------------------
 .../mllib/random/RandomDataGenerator.scala      |  69 +++-
 .../apache/spark/mllib/random/RandomRDDs.scala  | 363 +++++++++++++++++++
 .../spark/mllib/random/JavaRandomRDDsSuite.java |  99 +++++
 .../mllib/random/RandomDataGeneratorSuite.scala |  52 ++-
 .../spark/mllib/random/RandomRDDsSuite.scala    |  43 +++
 5 files changed, 622 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/ee1fb97a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
index 51f9b86..405bae6 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomDataGenerator.scala
@@ -17,7 +17,8 @@
 
 package org.apache.spark.mllib.random
 
-import org.apache.commons.math3.distribution.PoissonDistribution
+import org.apache.commons.math3.distribution.{ExponentialDistribution,
+  GammaDistribution, LogNormalDistribution, PoissonDistribution}
 
 import org.apache.spark.annotation.DeveloperApi
 import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
@@ -88,14 +89,76 @@ class StandardNormalGenerator extends RandomDataGenerator[Double] {
 @DeveloperApi
 class PoissonGenerator(val mean: Double) extends RandomDataGenerator[Double] {
 
-  private var rng = new PoissonDistribution(mean)
+  private val rng = new PoissonDistribution(mean)
 
   override def nextValue(): Double = rng.sample()
 
   override def setSeed(seed: Long) {
-    rng = new PoissonDistribution(mean)
     rng.reseedRandomGenerator(seed)
   }
 
   override def copy(): PoissonGenerator = new PoissonGenerator(mean)
 }
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the exponential distribution with the given mean.
+ *
+ * @param mean mean for the exponential distribution.
+ */
+@DeveloperApi
+class ExponentialGenerator(val mean: Double) extends RandomDataGenerator[Double] {
+
+  private val rng = new ExponentialDistribution(mean)
+
+  override def nextValue(): Double = rng.sample()
+
+  override def setSeed(seed: Long) {
+    rng.reseedRandomGenerator(seed)
+  }
+
+  override def copy(): ExponentialGenerator = new ExponentialGenerator(mean)
+}
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the gamma distribution with the given shape and scale.
+ *
+ * @param shape shape for the gamma distribution.
+ * @param scale scale for the gamma distribution
+ */
+@DeveloperApi
+class GammaGenerator(val shape: Double, val scale: Double) extends RandomDataGenerator[Double] {
+
+  private val rng = new GammaDistribution(shape, scale)
+
+  override def nextValue(): Double = rng.sample()
+
+  override def setSeed(seed: Long) {
+    rng.reseedRandomGenerator(seed)
+  }
+
+  override def copy(): GammaGenerator = new GammaGenerator(shape, scale)
+}
+
+/**
+ * :: DeveloperApi ::
+ * Generates i.i.d. samples from the log normal distribution with the
+ * given mean and standard deviation.
+ *
+ * @param mean mean for the log normal distribution.
+ * @param std standard deviation for the log normal distribution
+ */
+@DeveloperApi
+class LogNormalGenerator(val mean: Double, val std: Double) extends RandomDataGenerator[Double] {
+
+  private val rng = new LogNormalDistribution(mean, std)
+
+  override def nextValue(): Double = rng.sample()
+
+  override def setSeed(seed: Long) {
+    rng.reseedRandomGenerator(seed)
+  }
+
+  override def copy(): LogNormalGenerator = new LogNormalGenerator(mean, std)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1fb97a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
----------------------------------------------------------------------
diff --git a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
index c5f4b08..955c593 100644
--- a/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
+++ b/mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDs.scala
@@ -177,6 +177,176 @@ object RandomRDDs {
   }
 
   /**
+   * Generates an RDD comprised of i.i.d. samples from the exponential distribution with
+   * the input mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or 1 / lambda, for the exponential distribution.
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  def exponentialRDD(
+      sc: SparkContext,
+      mean: Double,
+      size: Long,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Double] = {
+    val exponential = new ExponentialGenerator(mean)
+    randomRDD(sc, exponential, size, numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#exponentialRDD]].
+   */
+  def exponentialJavaRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      size: Long,
+      numPartitions: Int,
+      seed: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions, seed))
+  }
+
+  /**
+   * [[RandomRDDs#exponentialJavaRDD]] with the default seed.
+   */
+  def exponentialJavaRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      size: Long,
+      numPartitions: Int): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size, numPartitions))
+  }
+
+  /**
+   * [[RandomRDDs#exponentialJavaRDD]] with the default number of partitions and the default seed.
+   */
+  def exponentialJavaRDD(jsc: JavaSparkContext, mean: Double, size: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(exponentialRDD(jsc.sc, mean, size))
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d. samples from the gamma distribution with the input
+   *  shape and scale.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param shape shape parameter (> 0) for the gamma distribution
+   * @param scale scale parameter (> 0) for the gamma distribution  
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  def gammaRDD(
+      sc: SparkContext,
+      shape: Double,
+      scale: Double,
+      size: Long,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Double] = {
+    val gamma = new GammaGenerator(shape, scale)
+    randomRDD(sc, gamma, size, numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#gammaRDD]].
+   */
+  def gammaJavaRDD(
+      jsc: JavaSparkContext,
+      shape: Double,
+      scale: Double,
+      size: Long,
+      numPartitions: Int,
+      seed: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions, seed))
+  }
+
+  /**
+   * [[RandomRDDs#gammaJavaRDD]] with the default seed.
+   */
+  def gammaJavaRDD(
+      jsc: JavaSparkContext,
+      shape: Double,
+      scale: Double,
+      size: Long,
+      numPartitions: Int): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size, numPartitions))
+  }
+
+  /**
+   * [[RandomRDDs#gammaJavaRDD]] with the default number of partitions and the default seed.
+   */
+  def gammaJavaRDD(
+    jsc: JavaSparkContext,
+    shape: Double,
+    scale: Double,
+    size: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(gammaRDD(jsc.sc, shape, scale, size))
+  }
+
+  /**
+   * Generates an RDD comprised of i.i.d. samples from the log normal distribution with the input
+   *  mean and standard deviation
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean mean for the log normal distribution
+   * @param std standard deviation for the log normal distribution  
+   * @param size Size of the RDD.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
+   */
+  def logNormalRDD(
+      sc: SparkContext,
+      mean: Double,
+      std: Double,
+      size: Long,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Double] = {
+    val logNormal = new LogNormalGenerator(mean, std)
+    randomRDD(sc, logNormal, size, numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#logNormalRDD]].
+   */
+  def logNormalJavaRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      std: Double,
+      size: Long,
+      numPartitions: Int,
+      seed: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions, seed))
+  }
+
+  /**
+   * [[RandomRDDs#logNormalJavaRDD]] with the default seed.
+   */
+  def logNormalJavaRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      std: Double,
+      size: Long,
+      numPartitions: Int): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size, numPartitions))
+  }
+
+  /**
+   * [[RandomRDDs#logNormalJavaRDD]] with the default number of partitions and the default seed.
+   */
+  def logNormalJavaRDD(
+    jsc: JavaSparkContext,
+    mean: Double,
+    std: Double,
+    size: Long): JavaDoubleRDD = {
+    JavaDoubleRDD.fromRDD(logNormalRDD(jsc.sc, mean, std, size))
+  }
+
+
+  /**
    * :: DeveloperApi ::
    * Generates an RDD comprised of i.i.d. samples produced by the input RandomDataGenerator.
    *
@@ -308,6 +478,72 @@ object RandomRDDs {
   }
 
   /**
+   * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from a
+   * log normal distribution.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean of the log normal distribution.
+   * @param std Standard deviation of the log normal distribution.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`).
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Vector] with vectors containing i.i.d. samples.
+   */
+  def logNormalVectorRDD(
+      sc: SparkContext,
+      mean: Double,
+      std: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+    val logNormal = new LogNormalGenerator(mean, std)
+    randomVectorRDD(sc, logNormal, numRows, numCols,
+      numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#logNormalVectorRDD]].
+   */
+  def logNormalJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      std: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): JavaRDD[Vector] = {
+    logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions, seed).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#logNormalJavaVectorRDD]] with the default seed.
+   */
+  def logNormalJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      std: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): JavaRDD[Vector] = {
+    logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols, numPartitions).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#logNormalJavaVectorRDD]] with the default number of partitions and
+   * the default seed.
+   */
+  def logNormalJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      std: Double,
+      numRows: Long,
+      numCols: Int): JavaRDD[Vector] = {
+    logNormalVectorRDD(jsc.sc, mean, std, numRows, numCols).toJavaRDD()
+  }
+
+  /**
    * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
    * Poisson distribution with the input mean.
    *
@@ -367,6 +603,133 @@ object RandomRDDs {
   }
 
   /**
+   * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
+   * exponential distribution with the input mean.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param mean Mean, or 1 / lambda, for the Exponential distribution.
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
+   */
+  def exponentialVectorRDD(
+      sc: SparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+    val exponential = new ExponentialGenerator(mean)
+    randomVectorRDD(sc, exponential, numRows, numCols,
+      numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#exponentialVectorRDD]].
+   */
+  def exponentialJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): JavaRDD[Vector] = {
+    exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions, seed).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#exponentialJavaVectorRDD]] with the default seed.
+   */
+  def exponentialJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): JavaRDD[Vector] = {
+    exponentialVectorRDD(jsc.sc, mean, numRows, numCols, numPartitions).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#exponentialJavaVectorRDD]] with the default number of partitions
+   * and the default seed.
+   */
+  def exponentialJavaVectorRDD(
+      jsc: JavaSparkContext,
+      mean: Double,
+      numRows: Long,
+      numCols: Int): JavaRDD[Vector] = {
+    exponentialVectorRDD(jsc.sc, mean, numRows, numCols).toJavaRDD()
+  }
+
+
+  /**
+   * Generates an RDD[Vector] with vectors containing i.i.d. samples drawn from the
+   * gamma distribution with the input shape and scale.
+   *
+   * @param sc SparkContext used to create the RDD.
+   * @param shape shape parameter (> 0) for the gamma distribution.
+   * @param scale scale parameter (> 0) for the gamma distribution. 
+   * @param numRows Number of Vectors in the RDD.
+   * @param numCols Number of elements in each Vector.
+   * @param numPartitions Number of partitions in the RDD (default: `sc.defaultParallelism`)
+   * @param seed Random seed (default: a random long integer).
+   * @return RDD[Vector] with vectors containing i.i.d. samples ~ Exp(mean).
+   */
+  def gammaVectorRDD(
+      sc: SparkContext,
+      shape: Double,
+      scale: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int = 0,
+      seed: Long = Utils.random.nextLong()): RDD[Vector] = {
+    val gamma = new GammaGenerator(shape, scale)
+    randomVectorRDD(sc, gamma, numRows, numCols, numPartitionsOrDefault(sc, numPartitions), seed)
+  }
+
+  /**
+   * Java-friendly version of [[RandomRDDs#gammaVectorRDD]].
+   */
+  def gammaJavaVectorRDD(
+      jsc: JavaSparkContext,
+      shape: Double,
+      scale: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int,
+      seed: Long): JavaRDD[Vector] = {
+    gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions, seed).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#gammaJavaVectorRDD]] with the default seed.
+   */
+  def gammaJavaVectorRDD(
+      jsc: JavaSparkContext,
+      shape: Double,
+      scale: Double,
+      numRows: Long,
+      numCols: Int,
+      numPartitions: Int): JavaRDD[Vector] = {
+    gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols, numPartitions).toJavaRDD()
+  }
+
+  /**
+   * [[RandomRDDs#gammaJavaVectorRDD]] with the default number of partitions and the default seed.
+   */
+  def gammaJavaVectorRDD(
+      jsc: JavaSparkContext,
+      shape: Double,
+      scale: Double,
+      numRows: Long,
+      numCols: Int): JavaRDD[Vector] = {
+    gammaVectorRDD(jsc.sc, shape, scale, numRows, numCols).toJavaRDD()
+  }
+
+
+  /**
    * :: DeveloperApi ::
    * Generates an RDD[Vector] with vectors containing i.i.d. samples produced by the
    * input RandomDataGenerator.

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1fb97a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
----------------------------------------------------------------------
diff --git a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
index a725736..fcc13c0 100644
--- a/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
+++ b/mllib/src/test/java/org/apache/spark/mllib/random/JavaRandomRDDsSuite.java
@@ -70,6 +70,21 @@ public class JavaRandomRDDsSuite {
   }
 
   @Test
+  public void testLNormalRDD() {
+    double mean = 4.0;
+    double std = 2.0;
+    long m = 1000L;
+    int p = 2;
+    long seed = 1L;
+    JavaDoubleRDD rdd1 = logNormalJavaRDD(sc, mean, std, m);
+    JavaDoubleRDD rdd2 = logNormalJavaRDD(sc, mean, std, m, p);
+    JavaDoubleRDD rdd3 = logNormalJavaRDD(sc, mean, std, m, p, seed);
+    for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+    }
+  }
+
+  @Test
   public void testPoissonRDD() {
     double mean = 2.0;
     long m = 1000L;
@@ -84,6 +99,36 @@ public class JavaRandomRDDsSuite {
   }
 
   @Test
+  public void testExponentialRDD() {
+    double mean = 2.0;
+    long m = 1000L;
+    int p = 2;
+    long seed = 1L;
+    JavaDoubleRDD rdd1 = exponentialJavaRDD(sc, mean, m);
+    JavaDoubleRDD rdd2 = exponentialJavaRDD(sc, mean, m, p);
+    JavaDoubleRDD rdd3 = exponentialJavaRDD(sc, mean, m, p, seed);
+    for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+    }
+  }
+
+  @Test
+  public void testGammaRDD() {
+    double shape = 1.0;
+    double scale = 2.0;
+    long m = 1000L;
+    int p = 2;
+    long seed = 1L;
+    JavaDoubleRDD rdd1 = gammaJavaRDD(sc, shape, scale, m);
+    JavaDoubleRDD rdd2 = gammaJavaRDD(sc, shape, scale, m, p);
+    JavaDoubleRDD rdd3 = gammaJavaRDD(sc, shape, scale, m, p, seed);
+    for (JavaDoubleRDD rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+    }
+  }
+
+
+  @Test
   @SuppressWarnings("unchecked")
   public void testUniformVectorRDD() {
     long m = 100L;
@@ -117,6 +162,24 @@ public class JavaRandomRDDsSuite {
 
   @Test
   @SuppressWarnings("unchecked")
+  public void testLogNormalVectorRDD() {
+    double mean = 4.0;
+    double std = 2.0;  
+    long m = 100L;
+    int n = 10;
+    int p = 2;
+    long seed = 1L;
+    JavaRDD<Vector> rdd1 = logNormalJavaVectorRDD(sc, mean, std, m, n);
+    JavaRDD<Vector> rdd2 = logNormalJavaVectorRDD(sc, mean, std, m, n, p);
+    JavaRDD<Vector> rdd3 = logNormalJavaVectorRDD(sc, mean, std, m, n, p, seed);
+    for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+      Assert.assertEquals(n, rdd.first().size());
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
   public void testPoissonVectorRDD() {
     double mean = 2.0;
     long m = 100L;
@@ -131,4 +194,40 @@ public class JavaRandomRDDsSuite {
       Assert.assertEquals(n, rdd.first().size());
     }
   }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testExponentialVectorRDD() {
+    double mean = 2.0;
+    long m = 100L;
+    int n = 10;
+    int p = 2;
+    long seed = 1L;
+    JavaRDD<Vector> rdd1 = exponentialJavaVectorRDD(sc, mean, m, n);
+    JavaRDD<Vector> rdd2 = exponentialJavaVectorRDD(sc, mean, m, n, p);
+    JavaRDD<Vector> rdd3 = exponentialJavaVectorRDD(sc, mean, m, n, p, seed);
+    for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+      Assert.assertEquals(n, rdd.first().size());
+    }
+  }
+
+  @Test
+  @SuppressWarnings("unchecked")
+  public void testGammaVectorRDD() {
+    double shape = 1.0;
+    double scale = 2.0;
+    long m = 100L;
+    int n = 10;
+    int p = 2;
+    long seed = 1L;
+    JavaRDD<Vector> rdd1 = gammaJavaVectorRDD(sc, shape, scale, m, n);
+    JavaRDD<Vector> rdd2 = gammaJavaVectorRDD(sc, shape, scale, m, n, p);
+    JavaRDD<Vector> rdd3 = gammaJavaVectorRDD(sc, shape, scale, m, n, p, seed);
+    for (JavaRDD<Vector> rdd: Lists.newArrayList(rdd1, rdd2, rdd3)) {
+      Assert.assertEquals(m, rdd.count());
+      Assert.assertEquals(n, rdd.first().size());
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1fb97a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
index 3df7c12..b792d81 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomDataGeneratorSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.mllib.random
 
+import scala.math
+
 import org.scalatest.FunSuite
 
 import org.apache.spark.util.StatCounter
@@ -25,7 +27,6 @@ import org.apache.spark.util.StatCounter
 class RandomDataGeneratorSuite extends FunSuite {
 
   def apiChecks(gen: RandomDataGenerator[Double]) {
-
     // resetting seed should generate the same sequence of random numbers
     gen.setSeed(42L)
     val array1 = (0 until 1000).map(_ => gen.nextValue())
@@ -79,6 +80,26 @@ class RandomDataGeneratorSuite extends FunSuite {
     distributionChecks(normal, 0.0, 1.0)
   }
 
+  test("LogNormalGenerator") {
+    List((0.0, 1.0), (0.0, 2.0), (2.0, 1.0), (2.0, 2.0)).map {
+      case (mean: Double, vari: Double) =>
+        val normal = new LogNormalGenerator(mean, math.sqrt(vari))
+        apiChecks(normal)
+
+        // mean of log normal = e^(mean + var / 2)
+        val expectedMean = math.exp(mean + 0.5 * vari)
+
+        // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+        val expectedStd = math.sqrt((math.exp(vari) - 1.0) * math.exp(2.0 * mean + vari))
+
+        // since sampling error increases with variance, let's set
+        // the absolute tolerance as a percentage
+        val epsilon = 0.05 * expectedStd * expectedStd
+
+        distributionChecks(normal, expectedMean, expectedStd, epsilon)
+    }
+  }
+
   test("PoissonGenerator") {
     // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
     for (mean <- List(1.0, 5.0, 100.0)) {
@@ -87,4 +108,33 @@ class RandomDataGeneratorSuite extends FunSuite {
       distributionChecks(poisson, mean, math.sqrt(mean), 0.1)
     }
   }
+
+  test("ExponentialGenerator") {
+    // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+    for (mean <- List(2.0, 5.0, 10.0, 50.0, 100.0)) {
+      val exponential = new ExponentialGenerator(mean)
+      apiChecks(exponential)
+      // var of exp = lambda^-2 = (1.0 / mean)^-2 = mean^2
+
+      // since sampling error increases with variance, let's set
+      // the absolute tolerance as a percentage
+      val epsilon = 0.05 * mean * mean
+
+      distributionChecks(exponential, mean, mean, epsilon)
+    }
+  }
+
+  test("GammaGenerator") {
+    // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
+    List((1.0, 2.0), (2.0, 2.0), (3.0, 2.0), (5.0, 1.0), (9.0, 0.5)).map {
+      case (shape: Double, scale: Double) =>
+        val gamma = new GammaGenerator(shape, scale)
+        apiChecks(gamma)
+        // mean of gamma = shape * scale
+        val expectedMean = shape * scale
+        // var of gamma = shape * scale^2
+        val expectedStd = math.sqrt(shape * scale * scale)
+        distributionChecks(gamma, expectedMean, expectedStd, 0.1)
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/ee1fb97a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
----------------------------------------------------------------------
diff --git a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
index ea5889b..6395188 100644
--- a/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
+++ b/mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDsSuite.scala
@@ -110,7 +110,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
   test("randomRDD for different distributions") {
     val size = 100000L
     val numPartitions = 10
+
+    //  mean of log normal = e^(mean + var / 2)
+    val logNormalMean = math.exp(0.5)
+    // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+    val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
+    val gammaScale = 1.0
+    val gammaShape = 2.0
+    // mean of gamma = shape * scale
+    val gammaMean = gammaShape * gammaScale
+    // var of gamma = shape * scale^2
+    val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
     val poissonMean = 100.0
+    val exponentialMean = 1.0
 
     for (seed <- 0 until 5) {
       val uniform = RandomRDDs.uniformRDD(sc, size, numPartitions, seed)
@@ -119,8 +131,18 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
       val normal = RandomRDDs.normalRDD(sc, size, numPartitions, seed)
       testGeneratedRDD(normal, size, numPartitions, 0.0, 1.0)
 
+      val logNormal = RandomRDDs.logNormalRDD(sc, 0.0, 1.0, size, numPartitions, seed)
+      testGeneratedRDD(logNormal, size, numPartitions, logNormalMean, logNormalStd, 0.1)
+
       val poisson = RandomRDDs.poissonRDD(sc, poissonMean, size, numPartitions, seed)
       testGeneratedRDD(poisson, size, numPartitions, poissonMean, math.sqrt(poissonMean), 0.1)
+
+      val exponential = RandomRDDs.exponentialRDD(sc, exponentialMean, size, numPartitions, seed)
+      testGeneratedRDD(exponential, size, numPartitions, exponentialMean, exponentialMean, 0.1)
+
+      val gamma = RandomRDDs.gammaRDD(sc, gammaShape, gammaScale, size, numPartitions, seed)
+      testGeneratedRDD(gamma, size, numPartitions, gammaMean, gammaStd, 0.1)
+
     }
 
     // mock distribution to check that partitions have unique seeds
@@ -132,7 +154,19 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
     val rows = 1000L
     val cols = 100
     val parts = 10
+
+    //  mean of log normal = e^(mean + var / 2)
+    val logNormalMean = math.exp(0.5)
+    // variance of log normal = (e^var - 1) * e^(2 * mean + var)
+    val logNormalStd = math.sqrt((math.E - 1.0) * math.E)
+    val gammaScale = 1.0
+    val gammaShape = 2.0
+    // mean of gamma = shape * scale
+    val gammaMean = gammaShape * gammaScale
+    // var of gamma = shape * scale^2
+    val gammaStd = math.sqrt(gammaShape * gammaScale * gammaScale)
     val poissonMean = 100.0
+    val exponentialMean = 1.0
 
     for (seed <- 0 until 5) {
       val uniform = RandomRDDs.uniformVectorRDD(sc, rows, cols, parts, seed)
@@ -141,8 +175,17 @@ class RandomRDDsSuite extends FunSuite with MLlibTestSparkContext with Serializa
       val normal = RandomRDDs.normalVectorRDD(sc, rows, cols, parts, seed)
       testGeneratedVectorRDD(normal, rows, cols, parts, 0.0, 1.0)
 
+      val logNormal = RandomRDDs.logNormalVectorRDD(sc, 0.0, 1.0, rows, cols, parts, seed)
+      testGeneratedVectorRDD(logNormal, rows, cols, parts, logNormalMean, logNormalStd, 0.1)
+
       val poisson = RandomRDDs.poissonVectorRDD(sc, poissonMean, rows, cols, parts, seed)
       testGeneratedVectorRDD(poisson, rows, cols, parts, poissonMean, math.sqrt(poissonMean), 0.1)
+
+      val exponential = RandomRDDs.exponentialVectorRDD(sc, exponentialMean, rows, cols, parts, seed)
+      testGeneratedVectorRDD(exponential, rows, cols, parts, exponentialMean, exponentialMean, 0.1)
+
+      val gamma = RandomRDDs.gammaVectorRDD(sc, gammaShape, gammaScale, rows, cols, parts, seed)
+      testGeneratedVectorRDD(gamma, rows, cols, parts, gammaMean, gammaStd, 0.1)
     }
   }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org