You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/08/21 07:21:29 UTC
spark git commit: [SPARK-21782][CORE] Repartition creates skews when
numPartitions is a power of 2
Repository: spark
Updated Branches:
refs/heads/master 28a6cca7d -> 77d046ec4
[SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2
## Problem
When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized, due to using fixed seed to initialize PRNG, and using the PRNG only once. See details in https://issues.apache.org/jira/browse/SPARK-21782
## What changes were proposed in this pull request?
Instead of directly using `0, 1, 2,...` seeds to initialize `Random`, hash them with `scala.util.hashing.byteswap32()`.
## How was this patch tested?
`build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite test`
Author: Sergey Serebryakov <ss...@tesla.com>
Closes #18990 from megaserg/repartition-skew.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77d046ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77d046ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77d046ec
Branch: refs/heads/master
Commit: 77d046ec47a9bfa6323aa014869844c28e18e049
Parents: 28a6cca
Author: Sergey Serebryakov <ss...@tesla.com>
Authored: Mon Aug 21 08:21:25 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Aug 21 08:21:25 2017 +0100
----------------------------------------------------------------------
core/src/main/scala/org/apache/spark/rdd/RDD.scala | 3 ++-
core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 6 ++++--
2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5435f59..8798dfc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
import scala.io.Codec
import scala.language.implicitConversions
import scala.reflect.{classTag, ClassTag}
+import scala.util.hashing
import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag](
if (shuffle) {
/** Distributes elements evenly across output partitions, starting from a random partition. */
val distributePartition = (index: Int, items: Iterator[T]) => {
- var position = (new Random(index)).nextInt(numPartitions)
+ var position = (new Random(hashing.byteswap32(index))).nextInt(numPartitions)
items.map { t =>
// Note that the hash code of the key will just be the key itself. The HashPartitioner
// will mod it with the number of total partitions.
http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 386c006..e994d72 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
val partitions = repartitioned.glom().collect()
// assert all elements are present
assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
- // assert no bucket is overloaded
+ // assert no bucket is overloaded or empty
for (partition <- partitions) {
val avg = input.size / finalPartitions
val maxPossible = avg + initialPartitions
- assert(partition.length <= maxPossible)
+ assert(partition.length <= maxPossible)
+ assert(!partition.isEmpty)
}
}
testSplitPartitions(Array.fill(100)(1), 10, 20)
testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
+ testSplitPartitions(Array.fill(1000)(1), 250, 128)
}
test("coalesced RDDs") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org