You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/12/09 16:09:47 UTC

spark git commit: [SPARK-12031][CORE][BUG] Integer overflow when do sampling

Repository: spark
Updated Branches:
  refs/heads/master f6883bb7a -> a11321686


[SPARK-12031][CORE][BUG] Integer overflow when do sampling

Author: uncleGen <hu...@gmail.com>

Closes #10023 from uncleGen/1.6-bugfix.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1132168
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1132168
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1132168

Branch: refs/heads/master
Commit: a113216865fd45ea39ae8f104e784af2cf667dcf
Parents: f6883bb
Author: uncleGen <hu...@gmail.com>
Authored: Wed Dec 9 15:09:40 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 9 15:09:40 2015 +0000

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/Partitioner.scala   |  4 ++--
 .../org/apache/spark/util/random/SamplingUtils.scala     | 11 ++++++-----
 2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a1132168/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e4df7af..ef9a2da 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -253,7 +253,7 @@ private[spark] object RangePartitioner {
    */
   def sketch[K : ClassTag](
       rdd: RDD[K],
-      sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
+      sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
     val shift = rdd.id
     // val classTagK = classTag[K] // to avoid serializing the entire partitioner object
     val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
@@ -262,7 +262,7 @@ private[spark] object RangePartitioner {
         iter, sampleSizePerPartition, seed)
       Iterator((idx, n, sample))
     }.collect()
-    val numItems = sketched.map(_._2.toLong).sum
+    val numItems = sketched.map(_._2).sum
     (numItems, sketched)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/a1132168/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
index c9a864a..f98932a 100644
--- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
@@ -34,7 +34,7 @@ private[spark] object SamplingUtils {
       input: Iterator[T],
       k: Int,
       seed: Long = Random.nextLong())
-    : (Array[T], Int) = {
+    : (Array[T], Long) = {
     val reservoir = new Array[T](k)
     // Put the first k elements in the reservoir.
     var i = 0
@@ -52,16 +52,17 @@ private[spark] object SamplingUtils {
       (trimReservoir, i)
     } else {
       // If input size > k, continue the sampling process.
+      var l = i.toLong
       val rand = new XORShiftRandom(seed)
       while (input.hasNext) {
         val item = input.next()
-        val replacementIndex = rand.nextInt(i)
+        val replacementIndex = (rand.nextDouble() * l).toLong
         if (replacementIndex < k) {
-          reservoir(replacementIndex) = item
+          reservoir(replacementIndex.toInt) = item
         }
-        i += 1
+        l += 1
       }
-      (reservoir, i)
+      (reservoir, l)
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org