You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2015/12/09 16:09:47 UTC
spark git commit: [SPARK-12031][CORE][BUG] Integer overflow when do
sampling
Repository: spark
Updated Branches:
refs/heads/master f6883bb7a -> a11321686
[SPARK-12031][CORE][BUG] Integer overflow when do sampling
Author: uncleGen <hu...@gmail.com>
Closes #10023 from uncleGen/1.6-bugfix.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a1132168
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a1132168
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a1132168
Branch: refs/heads/master
Commit: a113216865fd45ea39ae8f104e784af2cf667dcf
Parents: f6883bb
Author: uncleGen <hu...@gmail.com>
Authored: Wed Dec 9 15:09:40 2015 +0000
Committer: Sean Owen <so...@cloudera.com>
Committed: Wed Dec 9 15:09:40 2015 +0000
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/Partitioner.scala | 4 ++--
.../org/apache/spark/util/random/SamplingUtils.scala | 11 ++++++-----
2 files changed, 8 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a1132168/core/src/main/scala/org/apache/spark/Partitioner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/Partitioner.scala b/core/src/main/scala/org/apache/spark/Partitioner.scala
index e4df7af..ef9a2da 100644
--- a/core/src/main/scala/org/apache/spark/Partitioner.scala
+++ b/core/src/main/scala/org/apache/spark/Partitioner.scala
@@ -253,7 +253,7 @@ private[spark] object RangePartitioner {
*/
def sketch[K : ClassTag](
rdd: RDD[K],
- sampleSizePerPartition: Int): (Long, Array[(Int, Int, Array[K])]) = {
+ sampleSizePerPartition: Int): (Long, Array[(Int, Long, Array[K])]) = {
val shift = rdd.id
// val classTagK = classTag[K] // to avoid serializing the entire partitioner object
val sketched = rdd.mapPartitionsWithIndex { (idx, iter) =>
@@ -262,7 +262,7 @@ private[spark] object RangePartitioner {
iter, sampleSizePerPartition, seed)
Iterator((idx, n, sample))
}.collect()
- val numItems = sketched.map(_._2.toLong).sum
+ val numItems = sketched.map(_._2).sum
(numItems, sketched)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/a1132168/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
index c9a864a..f98932a 100644
--- a/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
+++ b/core/src/main/scala/org/apache/spark/util/random/SamplingUtils.scala
@@ -34,7 +34,7 @@ private[spark] object SamplingUtils {
input: Iterator[T],
k: Int,
seed: Long = Random.nextLong())
- : (Array[T], Int) = {
+ : (Array[T], Long) = {
val reservoir = new Array[T](k)
// Put the first k elements in the reservoir.
var i = 0
@@ -52,16 +52,17 @@ private[spark] object SamplingUtils {
(trimReservoir, i)
} else {
// If input size > k, continue the sampling process.
+ var l = i.toLong
val rand = new XORShiftRandom(seed)
while (input.hasNext) {
val item = input.next()
- val replacementIndex = rand.nextInt(i)
+ val replacementIndex = (rand.nextDouble() * l).toLong
if (replacementIndex < k) {
- reservoir(replacementIndex) = item
+ reservoir(replacementIndex.toInt) = item
}
- i += 1
+ l += 1
}
- (reservoir, i)
+ (reservoir, l)
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org