You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by pwendell <gi...@git.apache.org> on 2014/05/11 03:26:03 UTC
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
GitHub user pwendell opened a pull request:
https://github.com/apache/spark/pull/727
SPARK-1770: Load balance elements when repartitioning.
This patch adds better balancing when performing a repartition of an
RDD. Previously the elements in the RDD were hash partitioned, meaning
if the RDD was skewed certain partitions would end up being very large.
This commit adds load balancing of elements across the repartitioned
RDD splits. The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/pwendell/spark load-balance
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/spark/pull/727.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #727
----
commit acfa46aad3140b7d10890b15f13519db684cd2b7
Author: Patrick Wendell <pw...@gmail.com>
Date: 2014-05-11T00:59:13Z
SPARK-1770: Load balance elements when repartitioning.
This patch adds better balancing when performing a repartition of an
RDD. Previously the elements in the RDD were hash partitioned, meaning
if the RDD was skewed certain partitions would end up being very large.
This commit adds load balancing of elements across the repartitioned
RDD splits. The load balancing is not perfect: a given output partition
can have up to N more elements than the average if there are N input
partitions. However, some randomization is used to minimize the
probabiliy that this happens.
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42759620
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42787862
Merged build started.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/727#discussion_r12511628
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag](
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = {
if (shuffle) {
+ /** Distributes elements evenly across output partitions, starting from a random partition. */
+ def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ var position = (new Random(index)).nextInt(numPartitions)
+ items.map{ t =>
+ position = position + 1 % numPartitions
--- End diff --
This is going to mod the 1 with numPartitions and keep increasing `position`, probably not exactly what we want. In reality passing just `position` as the key would be fine, because the hashCode for it will be position as well, and the Partitioner will mod that with `numPartitions`. No need to mod twice.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42759614
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/727#discussion_r12511646
--- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
@@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
}
+ test("repartitioned RDDs perform load balancing") {
+ // Coalesce partitions
+ val input = Array.fill(1000)(1)
+ val initialPartitions = 10
+ val data = sc.parallelize(input, initialPartitions)
+
+ val repartitioned1 = data.repartition(2)
+ assert(repartitioned1.partitions.size == 2)
+ val partitions1 = repartitioned1.glom().collect()
+ // some noise in balancing is allowed due to randomization
+ assert(math.abs(partitions1(0).length - 500) < initialPartitions)
+ assert(math.abs(partitions1(1).length - 500) < initialPartitions)
+ assert(repartitioned1.collect() === input)
+
+ def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) {
+ val data = sc.parallelize(input, initialPartitions)
+ val repartitioned = data.repartition(finalPartitions)
+ assert(repartitioned.partitions.size == finalPartitions)
--- End diff --
Maybe you use `===` here for nicer message
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42787822
Looks good to me.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42760174
All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14881/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42788830
Merged build finished. All automated tests passed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:
https://github.com/apache/spark/pull/727#discussion_r12511624
--- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
@@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag](
def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
: RDD[T] = {
if (shuffle) {
+ /** Distributes elements evenly across output partitions, starting from a random partition. */
+ def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
+ var position = (new Random(index)).nextInt(numPartitions)
+ items.map{ t =>
--- End diff --
Put a space before `{`
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42787858
Merged build triggered.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:
https://github.com/apache/spark/pull/727
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42760173
Merged build finished. All automated tests passed.
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42759588
/cc @aarondav @mateiz
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---
[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...
Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:
https://github.com/apache/spark/pull/727#issuecomment-42788831
All automated tests passed.
Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14894/
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---