You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by dorx <gi...@git.apache.org> on 2014/07/22 05:50:24 UTC

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

GitHub user dorx opened a pull request:

    https://github.com/apache/spark/pull/1520

    [SPARK-2514] [mllib] Random RDD generator

    Utilities for generating random RDDs.
    
    RandomRDD and RandomVectorRDD are created instead of using `sc.parallelize(range:Range)` because `Range` objects in Scala can only have `size <= Int.MaxValue`. 
    
    The object `RandomRDDGenerators` can be transformed into a generator class to reduce the number of auxiliary methods for optional arguments. 

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/dorx/spark randomRDD

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/1520.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #1520
    
----
commit 888144416ced2b6d4c4839ac95b8a3feb2b3aba1
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-12T01:02:01Z

    RandomRDDGenerator: initial design
    
    Looking for feedback on design decisions. Very rough draft and untested.

commit 7cb0e406793db493cee72cb91ec02475c95c8de7
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-12T01:15:56Z

    fix for data inconsistency

commit 49ed20d9a30b0ba5d809974bbcf48cc76a45d68e
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-12T01:30:15Z

    alternative poisson distribution generator

commit f46d928c4e3e71ced4ede9295ef645fb714c9a69
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-19T02:13:58Z

    WIP

commit df5bcffc320bab85f6c5925b244fe9885d6d0eb5
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-21T07:47:07Z

    Merge branch 'generator' into randomRDD

commit 92d6f1c3ca0f22371f7f0387b875ac16d5030ffb
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-21T07:48:12Z

    solution for Cloneable

commit d56cacbde7a0550f53b59696ad7c7014c827f3f7
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-22T01:23:19Z

    impl with RandomRDD

commit bc90234c9639bfb3f4581af63cf4bf370c61e18b
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-22T03:37:40Z

    units passed.

commit aec68eb167ac9f11c64d95c698009cbf8919bd4b
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-22T03:42:31Z

    newline

commit 063ea0b48b769f7f8477ca2364f8e676f93c297e
Author: Doris Xin <do...@gmail.com>
Date:   2014-07-22T03:43:57Z

    Merge branch 'master' into randomRDD

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389599
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,92 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    --- End diff --
    
    `i.i.d` -> `i.i.d.` and in other places


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389761
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.StatCounter
    +
    +/**
    + * Note: avoid including APIs that do not set the seed for the RNG in unit tests
    + * in order to guarantee deterministic behavior.
    + *
    + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    + */
    +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
    +
    +  def testGeneratedRDD(rdd: RDD[Double],
    +      expectedSize: Long,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    val stats = rdd.stats()
    +    assert(expectedSize === stats.count)
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  // assume test RDDs are small
    +  def testGeneratedVectorRDD(rdd: RDD[Vector],
    +      expectedRows: Long,
    +      expectedColumns: Int,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    val values = new ArrayBuffer[Double]()
    +    rdd.collect.foreach { vector => {
    +      assert(vector.size === expectedColumns)
    +      values ++= vector.toArray
    +    }}
    +    assert(expectedRows === values.size / expectedColumns)
    +    val stats = new StatCounter(values)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  test("RandomRDD sizes") {
    +
    +    // some cases where size % numParts != 0 to test getPartitions behaves correctly
    +    for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
    +      val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +      assert(rdd.count() === size)
    +      assert(rdd.partitions.size === numPartitions)
    +
    +      // check that partition sizes are balanced
    +      val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
    +      val partStats = new StatCounter(partSizes)
    +      assert(partStats.stdev < 1.0)
    +    }
    +
    +    // size > Int.MaxValue
    +    val size = Int.MaxValue.toLong * 100L
    +    val numPartitions = 101
    +    val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +    assert(rdd.partitions.size === numPartitions)
    +    val count = rdd.partitions.foldLeft(0L){
    +      (count, part) => count + part.asInstanceOf[RandomRDDPartition].size
    --- End diff --
    
    Move `(count, part) =>` to the line above and insert a space between `){`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15266470
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    --- End diff --
    
    Switching to the slicing logic from `ParallelCollectionRDD` for more balanced partition sizes.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15266943
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val normal = new StandardNormalGenerator()
    +    randomRDD(sc, size, numPartitions, normal, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      mean: Double,
    +      seed: Long): RDD[Double] = {
    +    val poisson = new PoissonGenerator(mean)
    +    randomRDD(sc, size, numPartitions, poisson, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong)
    +  }
    +
    +  def randomRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      distribution: DistributionGenerator,
    --- End diff --
    
    copypasta fail. meant to change all of them to `rng`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213096
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    --- End diff --
    
    This is not evenly distributed, for example, when `size = 1000` and `numSlices = 101`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15423644
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.util.StatCounter
    +
    +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    +class DistributionGeneratorSuite extends FunSuite {
    +
    +  def apiChecks(gen: DistributionGenerator) {
    +
    +    // resetting seed should generate the same sequence of random numbers
    +    gen.setSeed(42L)
    +    val array1 = (0 until 1000).map(_ => gen.nextValue())
    +    gen.setSeed(42L)
    +    val array2 = (0 until 1000).map(_ => gen.nextValue())
    +    assert(array1.equals(array2))
    +
    +    // newInstance should contain a difference instance of the rng
    +    // i.e. setting difference seeds for difference instances produces different sequences of
    +    // random numbers.
    +    val gen2 = gen.copy()
    +    gen.setSeed(0L)
    +    val array3 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(1L)
    +    val array4 = (0 until 1000).map(_ => gen2.nextValue())
    +    // Compare arrays instead of elements since individual elements can coincide by chance but the
    +    // sequences should differ given two different seeds.
    +    assert(!array3.equals(array4))
    +
    +    // test that setting the same seed in the copied instance produces the same sequence of numbers
    +    gen.setSeed(0L)
    +    val array5 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(0L)
    +    val array6 = (0 until 1000).map(_ => gen2.nextValue())
    +    assert(array5.equals(array6))
    +  }
    +
    +  def distributionChecks(gen: DistributionGenerator,
    +      mean: Double = 0.0,
    +      stddev: Double = 1.0,
    +      epsilon: Double = 1e-3) {
    +    for (seed <- 0 until 5) {
    +      gen.setSeed(seed.toLong)
    +      val sample = (0 until 10000000).map { _ => gen.nextValue()}
    --- End diff --
    
    Old tests ran for 21 s on my laptop. Shaved off a couple 0s everywhere and  now it finishes in 986 milliseconds.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213062
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    --- End diff --
    
    I saw your argument about using `clone`. Between `copy` and `newInstance`, I think `copy` is better. For example, in Poisson, we need to copy the mean, which is not reflected in `newInstance`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213078
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    --- End diff --
    
    Could you double check whether we can have more than `Int.MaxValue` items in a single partition? It may break storage and couple RDD functions like `glob`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389665
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +import scala.util.Random
    +
    +private[mllib] class RandomRDDPartition(override val index: Int,
    +    val size: Int,
    +    val generator: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +  // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size
    +  require(size > 0, "Positive partition size required.")
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue,
    +    "Partition size cannot exceed Int.MaxValue")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +  require(math.ceil(size.toDouble / numSlices) <= Int.MaxValue,
    +    "Partition size cannot exceed Int.MaxValue")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    var start: Long = 0
    +    var end: Long = 0
    +    val random = new Random(seed)
    +    while (i < numSlices) {
    +      end = ((i + 1) * size) / numSlices
    +      partitions(i) = new RandomRDDPartition(i, (end - start).toInt, rng, random.nextLong())
    +      start = end
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    val generator = partition.generator.copy()
    +    generator.setSeed(partition.seed)
    +    Iterator.fill(partition.size)(generator.nextValue())
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getVectorIterator(partition: RandomRDDPartition, vectorSize: Int): Iterator[Vector] = {
    +    val generator = partition.generator.copy()
    +    generator.setSeed(partition.seed)
    +    Iterator.fill(partition.size)(new DenseVector(
    +      (0 until vectorSize).map { _ => generator.nextValue() }.toArray))
    --- End diff --
    
    `Array.fill(vectorSize)(generator.nextValue())` may be faster because it doesn't create a temp object.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-49695427
  
    QA tests have started for PR 1520. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389602
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,92 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * Returns an i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * Returns a copy of the DistributionGenerator with a new instance of the rng object used in the
    +   * class when applicable for non-locking concurrent usage.
    +   */
    +  def copy(): DistributionGenerator
    +}
    +
    +/**
    + * Generates i.i.d. samples from U[0.0, 1.0]
    + */
    +class UniformGenerator extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    +
    +  override def nextValue(): Double = {
    +    random.nextDouble()
    +  }
    +
    +  override def setSeed(seed: Long) = random.setSeed(seed)
    +
    +  override def copy(): UniformGenerator = new UniformGenerator()
    +}
    +
    +/**
    + * Generates i.i.d. samples from the Standard Normal Distribution.
    --- End diff --
    
    `Standard Normal Distribution` -> `standard normal distribution`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15390040
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +import scala.util.Random
    +
    +private[mllib] class RandomRDDPartition(override val index: Int,
    +    val size: Int,
    +    val generator: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +  // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size
    +  require(size > 0, "Positive partition size required.")
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    --- End diff --
    
    Could it be a `val`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213064
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    +}
    +
    +/**
    + * Generates i.i.d. samples from U[0.0, 1.0]
    + */
    +class UniformGenerator() extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    +
    +  /**
    +   * @return An i.i.d sample as a Double from U[0.0, 1.0].
    +   */
    +  override def nextValue(): Double = {
    +    random.nextDouble()
    +  }
    +
    +  /** Set random seed. */
    --- End diff --
    
    Doc is not necessary for the overloaded methods, unless you want to update it. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389618
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
    + *
    + * TODO Generate RDD[Vector] from multivariate distributions.
    + */
    +object RandomRDDGenerators {
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
    +   */
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, uniform,  size, numPartitions, seed)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
    +   */
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the uniform distribution on [0.0, 1.0].
    +   * sc.defaultParallelism used for the number of partitions in the RDD.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ U[0.0, 1.0].
    +   */
    +  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
    +   */
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val normal = new StandardNormalGenerator()
    +    randomRDD(sc, normal, size, numPartitions, seed)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
    +   */
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the standard normal distribution.
    +   * sc.defaultParallelism used for the number of partitions in the RDD.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param size Size of the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ N(0.0, 1.0).
    +   */
    +  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param mean Mean, or lambda, for the Poisson distribution.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
    +   */
    +  def poissonRDD(sc: SparkContext,
    +      mean: Double,
    +      size: Long,
    +      numPartitions: Int,
    +      seed: Long): RDD[Double] = {
    +    val poisson = new PoissonGenerator(mean)
    +    randomRDD(sc, poisson, size, numPartitions, seed)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param mean Mean, or lambda, for the Poisson distribution.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
    +   */
    +  def poissonRDD(sc: SparkContext, mean: Double, size: Long, numPartitions: Int): RDD[Double] = {
    +    poissonRDD(sc, mean, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples from the Poisson distribution with the input mean.
    +   * sc.defaultParallelism used for the number of partitions in the RDD.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param mean Mean, or lambda, for the Poisson distribution.
    +   * @param size Size of the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples ~ Pois(mean).
    +   */
    +  def poissonRDD(sc: SparkContext, mean: Double, size: Long): RDD[Double] = {
    +    poissonRDD(sc, mean, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param generator DistributionGenerator used to populate the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
    +   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
    +   */
    +  def randomRDD(sc: SparkContext,
    +      generator: DistributionGenerator,
    +      size: Long,
    +      numPartitions: Int,
    +      seed: Long): RDD[Double] = {
    +    new RandomRDD(sc, size, numPartitions, generator, seed)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param generator DistributionGenerator used to populate the RDD.
    +   * @param size Size of the RDD.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
    +   */
    +  def randomRDD(sc: SparkContext,
    +      generator: DistributionGenerator,
    +      size: Long,
    +      numPartitions: Int): RDD[Double] = {
    +    randomRDD(sc, generator, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD comprised of i.i.d samples produced by the input DistributionGenerator.
    +   * sc.defaultParallelism used for the number of partitions in the RDD.
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param generator DistributionGenerator used to populate the RDD.
    +   * @param size Size of the RDD.
    +   * @return RDD[Double] comprised of i.i.d. samples produced by generator.
    +   */
    +  def randomRDD(sc: SparkContext,
    +      generator: DistributionGenerator,
    +      size: Long): RDD[Double] = {
    +    randomRDD(sc, generator, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  /**
    +   * Generates an RDD[Vector] with vectors containing i.i.d samples drawn from the
    +   * uniform distribution on [0.0 1.0].
    +   *
    +   * @param sc SparkContext used to create the RDD.
    +   * @param numRows Number of Vectors in the RDD.
    +   * @param numColumns Number of elements in each Vector.
    +   * @param numPartitions Number of partitions in the RDD.
    +   * @param seed Seed for the RNG that generates the seed for the generator in each partition.
    +   * @return RDD[Vector] with vectors containing i.i.d samples ~ U[0.0, 1.0].
    +   */
    +  def uniformVectorRDD(sc: SparkContext,
    +      numRows: Long,
    +      numColumns: Int,
    --- End diff --
    
    Do you mind changing it to `numCols` to match the naming in distributed matrices?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15307029
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val normal = new StandardNormalGenerator()
    +    randomRDD(sc, size, numPartitions, normal, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      mean: Double,
    +      seed: Long): RDD[Double] = {
    +    val poisson = new PoissonGenerator(mean)
    +    randomRDD(sc, size, numPartitions, poisson, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong)
    +  }
    +
    +  def randomRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      distribution: DistributionGenerator,
    --- End diff --
    
    Hmm let's go with `generator` to be clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213069
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    --- End diff --
    
    It is very confusing to have both `(SparkContext, Long, Int)` and `(SparkContext, Long, Long)`. If a user doesn't see `(SparkContext, Long, Int)` and treat the third argument as the seed and set an integer, it actually sets the number of partitions. Maybe we should only allow default values at the end. That is,
    
    ~~~
    def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long)
    
    def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int)
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389629
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +import scala.util.Random
    +
    +private[mllib] class RandomRDDPartition(override val index: Int,
    +    val size: Int,
    +    val generator: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +  // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size
    --- End diff --
    
    The comment needs update because it is possible to cast a Long to a positive Int:
    
    ~~~
    scala> 100000000000L.toInt
    res0: Int = 1215752192
    ~~~


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15271948
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val normal = new StandardNormalGenerator()
    +    randomRDD(sc, size, numPartitions, normal, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      mean: Double,
    +      seed: Long): RDD[Double] = {
    +    val poisson = new PoissonGenerator(mean)
    +    randomRDD(sc, size, numPartitions, poisson, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong)
    +  }
    +
    +  def randomRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      distribution: DistributionGenerator,
    --- End diff --
    
    Which do you prefer, `rng` or `generator`? `rng` feels like an instance of `RandomNumberGenerator`. As we discussed, `DistributionGenerator` is different from `RandomNumberGenerator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213059
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    --- End diff --
    
    Change `@return` to `Returns`. Otherwise the summary will be empty in the generated docs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15215522
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    +}
    +
    +/**
    + * Generates i.i.d. samples from U[0.0, 1.0]
    + */
    +class UniformGenerator() extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    +
    +  /**
    +   * @return An i.i.d sample as a Double from U[0.0, 1.0].
    +   */
    +  override def nextValue(): Double = {
    +    random.nextDouble()
    +  }
    +
    +  /** Set random seed. */
    +  override def setSeed(seed: Long) = random.setSeed(seed)
    +
    +  override def newInstance(): UniformGenerator = new UniformGenerator()
    +}
    +
    +/**
    + * Generates i.i.d. samples from the Standard Normal Distribution.
    + */
    +class StandardNormalGenerator() extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    --- End diff --
    
    Is it allowed to use a DistributionGenerator before calling setSeed? It would seem simpler to disallow that, but it seems to be something it got from trait Pseudorandom.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15262023
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    --- End diff --
    
    So ideally `newInstance` would be an abstract static method inside the DistributionGenerator class that takes an instance as an argument to better express the fact we're copying the class members in the new instance, but since abstract static methods in interfaces aren't really supported in Scala (a combination of trait and object here will be messy for users to implement), copy will do nicely here. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50213682
  
    QA tests have started for PR 1520. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389677
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.StatCounter
    +
    +/**
    + * Note: avoid including APIs that do not set the seed for the RNG in unit tests
    + * in order to guarantee deterministic behavior.
    + *
    + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    + */
    --- End diff --
    
    Those comments are not JavaDoc. Could you move it inside the class closure and change the first line from `/**` to `/*`?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15275118
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    +      } else {
    +        new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed)
    +      }
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    partition.rng.setSeed(partition.seed + partition.index)
    --- End diff --
    
    BTW I think this java.util.Random issue is the reason we switched to the current approach in PartitionwiseSampledRDD.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213061
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    --- End diff --
    
    `partition` has no context here. Maybe simply mention that this is for running multiple instances concurrently.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15275100
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    +      } else {
    +        new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed)
    +      }
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    partition.rng.setSeed(partition.seed + partition.index)
    --- End diff --
    
    I'm not sure this is a great idea unfortunately, see http://stackoverflow.com/questions/426350/seeding-java-util-random-with-consecutive-numbers. I think it would be safer to use a RNG to generate the seeds. In tests you can just make sure that "most" of them are distinct instead, or you can test for the specific sequence expected by our generator.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389640
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +import scala.util.Random
    +
    +private[mllib] class RandomRDDPartition(override val index: Int,
    +    val size: Int,
    +    val generator: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +  // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size
    +  require(size > 0, "Positive partition size required.")
    --- End diff --
    
    If `numPartiitons` > `size`, there would be empty partitions. We should allow this case, because it happens when a user uses the default number of partitions, which may be greater than `size`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15265297
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    --- End diff --
    
    There's no restrictions on the size of each Partition from the trait (in fact it doesn't even need to have a size). The restriction of `size <= Int.MaxValue` happens when the RDD is cached. In the case of `glom`, the size of the entire RDD needs to be `<= Int.MaxValue` since that's the max size of an array.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50213475
  
    Jenkins, retest this please.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-49695577
  
    @falaki @jkbradley @mengxr


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15265929
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    --- End diff --
    
    Or I could just add `.newInstance()' in line 118, which I prefer since it's not necessary for us to have a new rng every time the RDD is computed. But thanks for pointing out this use case.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389612
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,422 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +/**
    + * Generator methods for creating RDDs comprised of i.i.d samples from some distribution.
    + *
    + * TODO Generate RDD[Vector] from multivariate distributions.
    --- End diff --
    
    Move `TODO: ...` inside code block. Otherwise, it becomes part of the documentation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15271812
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    --- End diff --
    
    It feels more secure if people see the following lines in the same group.
    
    ~~~
    val thisGenerator = generator.copy()
    thisGenerator.setSeed(seed)
    iter.fill(...)(thisGenerator.nextValue())
    ~~~
    
    If there are only
    
    ~~~
    generator.setSeed(seed)
    iter.fill(...)(generator.nextValue())
    ~~~
    
    people would wonder whether there are concurrency issues.



---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50289239
  
    LGTM. Merged into master. Thanks for adding random RDD generators!!


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213091
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    --- End diff --
    
    If we couldn't have more than `Int.MaxValue` items per iteration, this is `Iterator.fill(numElem)(rng.nextValue())`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389923
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.util.StatCounter
    +
    +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    +class DistributionGeneratorSuite extends FunSuite {
    +
    +  def apiChecks(gen: DistributionGenerator) {
    +
    +    // resetting seed should generate the same sequence of random numbers
    +    gen.setSeed(42L)
    +    val array1 = (0 until 1000).map(_ => gen.nextValue())
    +    gen.setSeed(42L)
    +    val array2 = (0 until 1000).map(_ => gen.nextValue())
    +    assert(array1.equals(array2))
    +
    +    // newInstance should contain a difference instance of the rng
    +    // i.e. setting difference seeds for difference instances produces different sequences of
    +    // random numbers.
    +    val gen2 = gen.copy()
    +    gen.setSeed(0L)
    +    val array3 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(1L)
    +    val array4 = (0 until 1000).map(_ => gen2.nextValue())
    +    // Compare arrays instead of elements since individual elements can coincide by chance but the
    +    // sequences should differ given two different seeds.
    +    assert(!array3.equals(array4))
    +
    +    // test that setting the same seed in the copied instance produces the same sequence of numbers
    +    gen.setSeed(0L)
    +    val array5 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(0L)
    +    val array6 = (0 until 1000).map(_ => gen2.nextValue())
    +    assert(array5.equals(array6))
    +  }
    +
    +  def distributionChecks(gen: DistributionGenerator,
    +      mean: Double = 0.0,
    +      stddev: Double = 1.0,
    +      epsilon: Double = 1e-3) {
    +    for (seed <- 0 until 5) {
    +      gen.setSeed(seed.toLong)
    +      val sample = (0 until 10000000).map { _ => gen.nextValue()}
    --- End diff --
    
    Just curious, how long does the entire test suite take to finish?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389671
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/DistributionGeneratorSuite.scala ---
    @@ -0,0 +1,91 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.util.StatCounter
    +
    +// TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    +class DistributionGeneratorSuite extends FunSuite {
    +
    +  def apiChecks(gen: DistributionGenerator) {
    +
    +    // resetting seed should generate the same sequence of random numbers
    +    gen.setSeed(42L)
    +    val array1 = (0 until 1000).map(_ => gen.nextValue())
    +    gen.setSeed(42L)
    +    val array2 = (0 until 1000).map(_ => gen.nextValue())
    +    assert(array1.equals(array2))
    +
    +    // newInstance should contain a difference instance of the rng
    +    // i.e. setting difference seeds for difference instances produces different sequences of
    +    // random numbers.
    +    val gen2 = gen.copy()
    +    gen.setSeed(0L)
    +    val array3 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(1L)
    +    val array4 = (0 until 1000).map(_ => gen2.nextValue())
    +    // Compare arrays instead of elements since individual elements can coincide by chance but the
    +    // sequences should differ given two different seeds.
    +    assert(!array3.equals(array4))
    +
    +    // test that setting the same seed in the copied instance produces the same sequence of numbers
    +    gen.setSeed(0L)
    +    val array5 = (0 until 1000).map(_ => gen.nextValue())
    +    gen2.setSeed(0L)
    +    val array6 = (0 until 1000).map(_ => gen2.nextValue())
    +    assert(array5.equals(array6))
    +  }
    +
    +  def distributionChecks(gen: DistributionGenerator,
    +      mean: Double = 0.0,
    +      stddev: Double = 1.0,
    +      epsilon: Double = 1e-3) {
    +    for (seed <- 0 until 5) {
    +      gen.setSeed(seed.toLong)
    +      val sample = (0 until 10000000).map { _ => gen.nextValue()}
    +      val stats = new StatCounter(sample)
    +      assert(math.abs(stats.mean - mean) < epsilon)
    +      assert(math.abs(stats.stdev - stddev) < epsilon)
    +    }
    +  }
    +
    +  test("UniformGenerator") {
    +    val uniform = new UniformGenerator()
    +    apiChecks(uniform)
    +    // Stddev of uniform distribution = (ub - lb) / math.sqrt(12)
    +    distributionChecks(uniform, 0.5, 1 / math.sqrt(12))
    +  }
    +
    +  test("StandardNormalGenerator") {
    +    val normal = new StandardNormalGenerator()
    +    apiChecks(normal)
    +    distributionChecks(normal, 0.0, 1.0)
    +  }
    +
    +  test("PoissonGenerator") {
    +    // mean = 0.0 will not pass the API checks since 0.0 is always deterministically produced.
    +    for (mean <- List(1.0, 5.0, 100.0)) {
    +      val poisson = new PoissonGenerator(mean)
    +      apiChecks(poisson)
    +      distributionChecks(poisson, mean, math.sqrt(mean), 1e-2)
    +    }
    +  }
    +}
    +
    --- End diff --
    
    remove extra empty line


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15271907
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    --- End diff --
    
    So let us not put more than `Int.MaxValue` items to a single partition. If the size is beyond this limit, we should throw an error. Different from `collect`, `glom` converts `RDD[T]` to `RDD[Array[T]]`, coalescing all elements within each partition into an array. `glom` is not frequently used but we should support `cache`, which means we shouldn't allow more than `Int.MaxValue` number of items on a single partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15305960
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    +      } else {
    +        new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed)
    +      }
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    partition.rng.setSeed(partition.seed + partition.index)
    --- End diff --
    
    In that case, we need to change the impl for stratified sampling as well @mengxr 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213072
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    uniformRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val normal = new StandardNormalGenerator()
    +    randomRDD(sc, size, numPartitions, normal, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    +    normalRDD(sc, size, numPartitions, Utils.random.nextLong)
    +  }
    +
    +  def normalRDD(sc: SparkContext, size: Long): RDD[Double] = {
    +    normalRDD(sc, size, sc.defaultParallelism, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      mean: Double,
    +      seed: Long): RDD[Double] = {
    +    val poisson = new PoissonGenerator(mean)
    +    randomRDD(sc, size, numPartitions, poisson, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double, seed: Long): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, seed)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, numPartitions: Int, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, numPartitions, mean, Utils.random.nextLong)
    +  }
    +
    +  def poissonRDD(sc: SparkContext, size: Long, mean: Double): RDD[Double] = {
    +    poissonRDD(sc, size, sc.defaultParallelism, mean, Utils.random.nextLong)
    +  }
    +
    +  def randomRDD(sc: SparkContext,
    +      size: Long,
    +      numPartitions: Int,
    +      distribution: DistributionGenerator,
    --- End diff --
    
    We need to be consistent on the argument name. `distribution` is used here but `rng` is used in `randomVectorRDD`. `generator` sounds better to me than `distribution` because `DistributionGenerator` is a generator but not a distribution and in the context we don't need to use `distributionGenerator`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50120556
  
    @dorx Besides comments, could you mark distribution generators and methods that requires distribution generators `@Experimental`? Part of the reason is that we don't have the API in Python and whether we should implement the same in Python is not clear.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213082
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    --- End diff --
    
    You can put `override` directly in the constructor.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by falaki <gi...@git.apache.org>.
Github user falaki commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15246377
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/RandomRDDGenerators.scala ---
    @@ -0,0 +1,235 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import org.apache.spark.SparkContext
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomVectorRDD, RandomRDD}
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +// TODO add Scaladocs once API fully approved
    +// Alternatively, we can use the generator pattern to set numPartitions, seed, etc instead to bring
    +// down the number of methods here.
    +object RandomRDDGenerators {
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int, seed: Long): RDD[Double] = {
    +    val uniform = new UniformGenerator()
    +    randomRDD(sc, size, numPartitions, uniform, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, seed: Long): RDD[Double] = {
    +    uniformRDD(sc, size, sc.defaultParallelism, seed)
    +  }
    +
    +  def uniformRDD(sc: SparkContext, size: Long, numPartitions: Int): RDD[Double] = {
    --- End diff --
    
    Alternatively we can switch to a generator model. Doris suggested it in her TODO. What do you guys think?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-49935146
  
    QA tests have started for PR 1520. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50215914
  
    QA results for PR 1520:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>trait DistributionGenerator extends Pseudorandom with Serializable {<br>class UniformGenerator extends DistributionGenerator {<br>class StandardNormalGenerator extends DistributionGenerator {<br>class PoissonGenerator(val mean: Double) extends DistributionGenerator {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17205/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-49946571
  
    QA results for PR 1520:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>trait DistributionGenerator extends Pseudorandom with Serializable {<br>class UniformGenerator extends DistributionGenerator {<br>class StandardNormalGenerator extends DistributionGenerator {<br>class PoissonGenerator(val mean: Double) extends DistributionGenerator {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17060/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389784
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.StatCounter
    +
    +/**
    + * Note: avoid including APIs that do not set the seed for the RNG in unit tests
    + * in order to guarantee deterministic behavior.
    + *
    + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    + */
    +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
    +
    +  def testGeneratedRDD(rdd: RDD[Double],
    +      expectedSize: Long,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    val stats = rdd.stats()
    +    assert(expectedSize === stats.count)
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  // assume test RDDs are small
    +  def testGeneratedVectorRDD(rdd: RDD[Vector],
    +      expectedRows: Long,
    +      expectedColumns: Int,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    val values = new ArrayBuffer[Double]()
    +    rdd.collect.foreach { vector => {
    +      assert(vector.size === expectedColumns)
    +      values ++= vector.toArray
    +    }}
    +    assert(expectedRows === values.size / expectedColumns)
    +    val stats = new StatCounter(values)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  test("RandomRDD sizes") {
    +
    +    // some cases where size % numParts != 0 to test getPartitions behaves correctly
    +    for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
    +      val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +      assert(rdd.count() === size)
    +      assert(rdd.partitions.size === numPartitions)
    +
    +      // check that partition sizes are balanced
    +      val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
    +      val partStats = new StatCounter(partSizes)
    +      assert(partStats.stdev < 1.0)
    +    }
    +
    +    // size > Int.MaxValue
    +    val size = Int.MaxValue.toLong * 100L
    +    val numPartitions = 101
    +    val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +    assert(rdd.partitions.size === numPartitions)
    +    val count = rdd.partitions.foldLeft(0L){
    +      (count, part) => count + part.asInstanceOf[RandomRDDPartition].size
    +    }
    +    assert(count === size)
    +
    +    // size needs to be positive
    +    try {
    +      new RandomRDD(sc, 0, 10, new UniformGenerator, 0L)
    --- End diff --
    
    You can use `Intercept[IllegalArgumentException]{ ... }`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389645
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,118 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +import scala.util.Random
    +
    +private[mllib] class RandomRDDPartition(override val index: Int,
    +    val size: Int,
    +    val generator: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +  // Safety check in case a Long > Int.MaxValue cast to an Int was passed in as size
    +  require(size > 0, "Positive partition size required.")
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    --- End diff --
    
    `numSlices` -> `numPartitions` to match the naming in previous code?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15265484
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    +}
    +
    +/**
    + * Generates i.i.d. samples from U[0.0, 1.0]
    + */
    +class UniformGenerator() extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    +
    +  /**
    +   * @return An i.i.d sample as a Double from U[0.0, 1.0].
    +   */
    +  override def nextValue(): Double = {
    +    random.nextDouble()
    +  }
    +
    +  /** Set random seed. */
    +  override def setSeed(seed: Long) = random.setSeed(seed)
    +
    +  override def newInstance(): UniformGenerator = new UniformGenerator()
    +}
    +
    +/**
    + * Generates i.i.d. samples from the Standard Normal Distribution.
    + */
    +class StandardNormalGenerator() extends DistributionGenerator {
    +
    +  // XORShiftRandom for better performance. Thread safety isn't necessary here.
    +  private val random = new XORShiftRandom()
    --- End diff --
    
    As with most random objects, the DistributionGenerator should be created with a default seed (so using it before calling `setSeed` is legal). I like how in Colt it's called `reseed` instead, but `setSeed` is also widely adopted.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/1520


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213097
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    --- End diff --
    
    It is safer to make a copy in `compute` than here. In local mode, this may cause problems if a user uses the same generator to create two random RDDs.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389694
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.StatCounter
    +
    +/**
    + * Note: avoid including APIs that do not set the seed for the RNG in unit tests
    + * in order to guarantee deterministic behavior.
    + *
    + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    + */
    +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
    +
    +  def testGeneratedRDD(rdd: RDD[Double],
    +      expectedSize: Long,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    val stats = rdd.stats()
    +    assert(expectedSize === stats.count)
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  // assume test RDDs are small
    +  def testGeneratedVectorRDD(rdd: RDD[Vector],
    +      expectedRows: Long,
    +      expectedColumns: Int,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    val values = new ArrayBuffer[Double]()
    +    rdd.collect.foreach { vector => {
    +      assert(vector.size === expectedColumns)
    +      values ++= vector.toArray
    +    }}
    +    assert(expectedRows === values.size / expectedColumns)
    +    val stats = new StatCounter(values)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  test("RandomRDD sizes") {
    +
    +    // some cases where size % numParts != 0 to test getPartitions behaves correctly
    +    for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
    +      val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +      assert(rdd.count() === size)
    +      assert(rdd.partitions.size === numPartitions)
    +
    +      // check that partition sizes are balanced
    +      val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
    +      val partStats = new StatCounter(partSizes)
    +      assert(partStats.stdev < 1.0)
    --- End diff --
    
    Why checking the stdev of partition sizes? It should be `maxPartitionSize - minPartitionSize <= 1`.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50208419
  
    QA results for PR 1520:<br>- This patch FAILED unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>trait DistributionGenerator extends Pseudorandom with Serializable {<br>class UniformGenerator extends DistributionGenerator {<br>class StandardNormalGenerator extends DistributionGenerator {<br>class PoissonGenerator(val mean: Double) extends DistributionGenerator {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by dorx <gi...@git.apache.org>.
Github user dorx commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15265778
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    +      } else {
    +        new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed)
    +      }
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    partition.rng.setSeed(partition.seed + partition.index)
    --- End diff --
    
    Thanks for the pointer to PartitionwiseSampledRDD. I've actually looked a lot at it for the sampling PRs. There were two considerations for going with this more deterministic seed assignment per partition. One is that this is a lot easier to test to make sure that each partition has a different seed with this deterministic seed assignment, and the other is that we're guaranteed to have a unique seed per partition. Since most RNG implementations hash or otherwise scramble the bits in the input seed before setting it anyway, I think we can get away with the current implementation. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-50204159
  
    QA tests have started for PR 1520. This patch merges cleanly. <br>View progress: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/17197/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by SparkQA <gi...@git.apache.org>.
Github user SparkQA commented on the pull request:

    https://github.com/apache/spark/pull/1520#issuecomment-49700088
  
    QA results for PR 1520:<br>- This patch PASSES unit tests.<br>- This patch merges cleanly<br>- This patch adds the following public classes (experimental):<br>trait DistributionGenerator extends Pseudorandom with Serializable {<br>class UniformGenerator() extends DistributionGenerator {<br>class StandardNormalGenerator() extends DistributionGenerator {<br>class PoissonGenerator(val mean: Double) extends DistributionGenerator {<br><br>For more information see test ouptut:<br>https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/16942/consoleFull


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15215432
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/rdd/RandomRDD.scala ---
    @@ -0,0 +1,140 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.rdd
    +
    +import org.apache.spark.{Partition, SparkContext, TaskContext}
    +import org.apache.spark.mllib.linalg.{DenseVector, Vector}
    +import org.apache.spark.mllib.random.DistributionGenerator
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.Utils
    +
    +private[mllib] class RandomRDDPartition(val idx: Int,
    +    val size: Long,
    +    val rng: DistributionGenerator,
    +    val seed: Long) extends Partition {
    +
    +  override val index: Int = idx
    +
    +}
    +
    +// These two classes are necessary since Range objects in Scala cannot have size > Int.MaxValue
    +private[mllib] class RandomRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Double](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Double] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getPointIterator(split)
    +  }
    +
    +  override def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] class RandomVectorRDD(@transient private var sc: SparkContext,
    +    size: Long,
    +    vectorSize: Int,
    +    numSlices: Int,
    +    @transient rng: DistributionGenerator,
    +    @transient seed: Long = Utils.random.nextLong) extends RDD[Vector](sc, Nil) {
    +
    +  require(size > 0, "Positive RDD size required.")
    +  require(numSlices > 0, "Positive number of partitions required")
    +  require(vectorSize > 0, "Positive vector size required.")
    +
    +  override def compute(splitIn: Partition, context: TaskContext): Iterator[Vector] = {
    +    val split = splitIn.asInstanceOf[RandomRDDPartition]
    +    RandomRDD.getVectorIterator(split, vectorSize)
    +  }
    +
    +  override protected def getPartitions: Array[Partition] = {
    +    RandomRDD.getPartitions(size, numSlices, rng, seed)
    +  }
    +}
    +
    +private[mllib] object RandomRDD {
    +
    +  private[mllib] class FixedSizePointIterator(val numElem: Long, val rng: DistributionGenerator)
    +    extends Iterator[Double] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Double = {
    +      currentSize += 1
    +      rng.nextValue()
    +    }
    +  }
    +
    +  private[mllib] class FixedSizeVectorIterator(val numElem: Long,
    +      val vectorSize: Int,
    +      val rng: DistributionGenerator)
    +    extends Iterator[Vector] {
    +
    +    private var currentSize = 0
    +
    +    override def hasNext: Boolean = currentSize < numElem
    +
    +    override def next(): Vector = {
    +      currentSize += 1
    +      new DenseVector((0 until vectorSize).map { _ => rng.nextValue() }.toArray)
    +    }
    +  }
    +
    +  def getPartitions(size: Long,
    +      numSlices: Int,
    +      rng: DistributionGenerator,
    +      seed: Long): Array[Partition] = {
    +
    +    val firstPartitionSize = size / numSlices + size % numSlices
    +    val otherPartitionSize = size / numSlices
    +
    +    val partitions = new Array[RandomRDDPartition](numSlices)
    +    var i = 0
    +    while (i < numSlices) {
    +      partitions(i) =  if (i == 0) {
    +        new RandomRDDPartition(i, firstPartitionSize, rng, seed)
    +      } else {
    +        new RandomRDDPartition(i, otherPartitionSize, rng.newInstance(), seed)
    +      }
    +      i += 1
    +    }
    +    partitions.asInstanceOf[Array[Partition]]
    +  }
    +
    +  // The RNG has to be reset every time the iterator is requested to guarantee same data
    +  // every time the content of the RDD is examined.
    +  def getPointIterator(partition: RandomRDDPartition): Iterator[Double] = {
    +    partition.rng.setSeed(partition.seed + partition.index)
    --- End diff --
    
    Instead of doing this, it would be better to give each partition a seed generated by a Random object based on the RDD's seed. This way the seeds differ in more bits from each other. For example, take a look at PartitionwiseSampledRDD in the current codebase. It uses an RDD-wide seed to create a sequence of seeds, once for each partition, then it takes those as input to new RNGs for each partition.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15213063
  
    --- Diff: mllib/src/main/scala/org/apache/spark/mllib/random/DistributionGenerator.scala ---
    @@ -0,0 +1,105 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import cern.jet.random.Poisson
    +import cern.jet.random.engine.DRand
    +
    +import org.apache.spark.util.random.{XORShiftRandom, Pseudorandom}
    +
    +/**
    + * Trait for random number generators that generate i.i.d values from a distribution.
    + */
    +trait DistributionGenerator extends Pseudorandom with Serializable {
    +
    +  /**
    +   * @return An i.i.d sample as a Double from an underlying distribution.
    +   */
    +  def nextValue(): Double
    +
    +  /**
    +   * @return A copy of the DistributionGenerator with a new instance of the rng object used in the
    +   *         class when applicable. Each partition has a unique seed and therefore requires its
    +   *         own instance of the DistributionGenerator.
    +   */
    +  def newInstance(): DistributionGenerator
    +}
    +
    +/**
    + * Generates i.i.d. samples from U[0.0, 1.0]
    + */
    +class UniformGenerator() extends DistributionGenerator {
    --- End diff --
    
    Is `()` necessary?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: [SPARK-2514] [mllib] Random RDD generator

Posted by mengxr <gi...@git.apache.org>.
Github user mengxr commented on a diff in the pull request:

    https://github.com/apache/spark/pull/1520#discussion_r15389681
  
    --- Diff: mllib/src/test/scala/org/apache/spark/mllib/random/RandomRDDGeneratorsSuite.scala ---
    @@ -0,0 +1,171 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one or more
    + * contributor license agreements.  See the NOTICE file distributed with
    + * this work for additional information regarding copyright ownership.
    + * The ASF licenses this file to You under the Apache License, Version 2.0
    + * (the "License"); you may not use this file except in compliance with
    + * the License.  You may obtain a copy of the License at
    + *
    + *    http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.spark.mllib.random
    +
    +import scala.collection.mutable.ArrayBuffer
    +
    +import org.scalatest.FunSuite
    +
    +import org.apache.spark.SparkContext._
    +import org.apache.spark.mllib.linalg.Vector
    +import org.apache.spark.mllib.rdd.{RandomRDDPartition, RandomRDD}
    +import org.apache.spark.mllib.util.LocalSparkContext
    +import org.apache.spark.rdd.RDD
    +import org.apache.spark.util.StatCounter
    +
    +/**
    + * Note: avoid including APIs that do not set the seed for the RNG in unit tests
    + * in order to guarantee deterministic behavior.
    + *
    + * TODO update tests to use TestingUtils for floating point comparison after PR 1367 is merged
    + */
    +class RandomRDDGeneratorsSuite extends FunSuite with LocalSparkContext with Serializable {
    +
    +  def testGeneratedRDD(rdd: RDD[Double],
    +      expectedSize: Long,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    val stats = rdd.stats()
    +    assert(expectedSize === stats.count)
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  // assume test RDDs are small
    +  def testGeneratedVectorRDD(rdd: RDD[Vector],
    +      expectedRows: Long,
    +      expectedColumns: Int,
    +      expectedNumPartitions: Int,
    +      expectedMean: Double,
    +      expectedStddev: Double,
    +      epsilon: Double = 0.01) {
    +    assert(expectedNumPartitions === rdd.partitions.size)
    +    val values = new ArrayBuffer[Double]()
    +    rdd.collect.foreach { vector => {
    +      assert(vector.size === expectedColumns)
    +      values ++= vector.toArray
    +    }}
    +    assert(expectedRows === values.size / expectedColumns)
    +    val stats = new StatCounter(values)
    +    assert(math.abs(stats.mean - expectedMean) < epsilon)
    +    assert(math.abs(stats.stdev - expectedStddev) < epsilon)
    +  }
    +
    +  test("RandomRDD sizes") {
    +
    +    // some cases where size % numParts != 0 to test getPartitions behaves correctly
    +    for ((size, numPartitions) <- List((10000, 6), (12345, 1), (1000, 101))) {
    +      val rdd = new RandomRDD(sc, size, numPartitions, new UniformGenerator, 0L)
    +      assert(rdd.count() === size)
    +      assert(rdd.partitions.size === numPartitions)
    +
    +      // check that partition sizes are balanced
    +      val partSizes = rdd.partitions.map( p => p.asInstanceOf[RandomRDDPartition].size.toDouble)
    --- End diff --
    
    `( ` -> `(`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---