You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by sr...@apache.org on 2017/08/21 07:21:29 UTC

spark git commit: [SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2

Repository: spark
Updated Branches:
  refs/heads/master 28a6cca7d -> 77d046ec4


[SPARK-21782][CORE] Repartition creates skews when numPartitions is a power of 2

## Problem
When an RDD (particularly with a low item-per-partition ratio) is repartitioned to numPartitions = power of 2, the resulting partitions are very uneven-sized, due to using fixed seed to initialize PRNG, and using the PRNG only once. See details in https://issues.apache.org/jira/browse/SPARK-21782

## What changes were proposed in this pull request?
Instead of directly using `0, 1, 2,...` seeds to initialize `Random`, hash them with `scala.util.hashing.byteswap32()`.

## How was this patch tested?
`build/mvn -Dtest=none -DwildcardSuites=org.apache.spark.rdd.RDDSuite test`

Author: Sergey Serebryakov <ss...@tesla.com>

Closes #18990 from megaserg/repartition-skew.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/77d046ec
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/77d046ec
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/77d046ec

Branch: refs/heads/master
Commit: 77d046ec47a9bfa6323aa014869844c28e18e049
Parents: 28a6cca
Author: Sergey Serebryakov <ss...@tesla.com>
Authored: Mon Aug 21 08:21:25 2017 +0100
Committer: Sean Owen <so...@cloudera.com>
Committed: Mon Aug 21 08:21:25 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/org/apache/spark/rdd/RDD.scala      | 3 ++-
 core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala | 6 ++++--
 2 files changed, 6 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/main/scala/org/apache/spark/rdd/RDD.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/rdd/RDD.scala b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
index 5435f59..8798dfc 100644
--- a/core/src/main/scala/org/apache/spark/rdd/RDD.scala
+++ b/core/src/main/scala/org/apache/spark/rdd/RDD.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable.ArrayBuffer
 import scala.io.Codec
 import scala.language.implicitConversions
 import scala.reflect.{classTag, ClassTag}
+import scala.util.hashing
 
 import com.clearspring.analytics.stream.cardinality.HyperLogLogPlus
 import org.apache.hadoop.io.{BytesWritable, NullWritable, Text}
@@ -448,7 +449,7 @@ abstract class RDD[T: ClassTag](
     if (shuffle) {
       /** Distributes elements evenly across output partitions, starting from a random partition. */
       val distributePartition = (index: Int, items: Iterator[T]) => {
-        var position = (new Random(index)).nextInt(numPartitions)
+        var position = (new Random(hashing.byteswap32(index))).nextInt(numPartitions)
         items.map { t =>
           // Note that the hash code of the key will just be the key itself. The HashPartitioner
           // will mod it with the number of total partitions.

http://git-wip-us.apache.org/repos/asf/spark/blob/77d046ec/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
index 386c006..e994d72 100644
--- a/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
+++ b/core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala
@@ -347,16 +347,18 @@ class RDDSuite extends SparkFunSuite with SharedSparkContext {
       val partitions = repartitioned.glom().collect()
       // assert all elements are present
       assert(repartitioned.collect().sortWith(_ > _).toSeq === input.toSeq.sortWith(_ > _).toSeq)
-      // assert no bucket is overloaded
+      // assert no bucket is overloaded or empty
       for (partition <- partitions) {
         val avg = input.size / finalPartitions
         val maxPossible = avg + initialPartitions
-        assert(partition.length <=  maxPossible)
+        assert(partition.length <= maxPossible)
+        assert(!partition.isEmpty)
       }
     }
 
     testSplitPartitions(Array.fill(100)(1), 10, 20)
     testSplitPartitions(Array.fill(10000)(1) ++ Array.fill(10000)(2), 20, 100)
+    testSplitPartitions(Array.fill(1000)(1), 250, 128)
   }
 
   test("coalesced RDDs") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org