You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by pwendell <gi...@git.apache.org> on 2014/05/11 03:26:03 UTC

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

GitHub user pwendell opened a pull request:

    https://github.com/apache/spark/pull/727

    SPARK-1770: Load balance elements when repartitioning.

    This patch adds better balancing when performing a repartition of an
    RDD. Previously the elements in the RDD were hash partitioned, meaning
    if the RDD was skewed certain partitions would end up being very large.
    
    This commit adds load balancing of elements across the repartitioned
    RDD splits. The load balancing is not perfect: a given output partition
    can have up to N more elements than the average if there are N input
    partitions. However, some randomization is used to minimize the
    probabiliy that this happens.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/pwendell/spark load-balance

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/727.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #727
    
----
commit acfa46aad3140b7d10890b15f13519db684cd2b7
Author: Patrick Wendell <pw...@gmail.com>
Date:   2014-05-11T00:59:13Z

    SPARK-1770: Load balance elements when repartitioning.
    
    This patch adds better balancing when performing a repartition of an
    RDD. Previously the elements in the RDD were hash partitioned, meaning
    if the RDD was skewed certain partitions would end up being very large.
    
    This commit adds load balancing of elements across the repartitioned
    RDD splits. The load balancing is not perfect: a given output partition
    can have up to N more elements than the average if there are N input
    partitions. However, some randomization is used to minimize the
    probabiliy that this happens.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42759620
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42787862
  
    Merged build started. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/727#discussion_r12511628
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag](
       def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
           : RDD[T] = {
         if (shuffle) {
    +      /** Distributes elements evenly across output partitions, starting from a random partition. */
    +      def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
    +        var position = (new Random(index)).nextInt(numPartitions)
    +        items.map{ t =>
    +          position = position + 1 % numPartitions
    --- End diff --
    
    This is going to mod the 1 with numPartitions and keep increasing `position`, probably not exactly what we want. In reality passing just `position` as the key would be fine, because the hashCode for it will be position as well, and the Partitioner will mod that with `numPartitions`. No need to mod twice.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42759614
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/727#discussion_r12511646
  
    --- Diff: core/src/test/scala/org/apache/spark/rdd/RDDSuite.scala ---
    @@ -202,6 +202,39 @@ class RDDSuite extends FunSuite with SharedSparkContext {
         assert(repartitioned2.collect().toSet === (1 to 1000).toSet)
       }
     
    +  test("repartitioned RDDs perform load balancing") {
    +    // Coalesce partitions
    +    val input = Array.fill(1000)(1)
    +    val initialPartitions = 10
    +    val data = sc.parallelize(input, initialPartitions)
    +
    +    val repartitioned1 = data.repartition(2)
    +    assert(repartitioned1.partitions.size == 2)
    +    val partitions1 = repartitioned1.glom().collect()
    +    // some noise in balancing is allowed due to randomization
    +    assert(math.abs(partitions1(0).length - 500) < initialPartitions)
    +    assert(math.abs(partitions1(1).length - 500) < initialPartitions)
    +    assert(repartitioned1.collect() === input)
    +
    +    def testSplitPartitions(input: Seq[Int], initialPartitions: Int, finalPartitions: Int) {
    +      val data = sc.parallelize(input, initialPartitions)
    +      val repartitioned = data.repartition(finalPartitions)
    +      assert(repartitioned.partitions.size == finalPartitions)
    --- End diff --
    
    Maybe you use `===` here for nicer message


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42787822
  
    Looks good to me.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42760174
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14881/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42788830
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on a diff in the pull request:

    https://github.com/apache/spark/pull/727#discussion_r12511624
  
    --- Diff: core/src/main/scala/org/apache/spark/rdd/RDD.scala ---
    @@ -328,11 +328,20 @@ abstract class RDD[T: ClassTag](
       def coalesce(numPartitions: Int, shuffle: Boolean = false)(implicit ord: Ordering[T] = null)
           : RDD[T] = {
         if (shuffle) {
    +      /** Distributes elements evenly across output partitions, starting from a random partition. */
    +      def distributePartition(index: Int, items: Iterator[T]): Iterator[(Int, T)] = {
    +        var position = (new Random(index)).nextInt(numPartitions)
    +        items.map{ t =>
    --- End diff --
    
    Put a space before `{`


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42787858
  
     Merged build triggered. 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by asfgit <gi...@git.apache.org>.
Github user asfgit closed the pull request at:

    https://github.com/apache/spark/pull/727


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42760173
  
    Merged build finished. All automated tests passed.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42759588
  
    /cc @aarondav @mateiz 


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK-1770: Load balance elements when reparti...

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/727#issuecomment-42788831
  
    All automated tests passed.
    Refer to this link for build results: https://amplab.cs.berkeley.edu/jenkins/job/SparkPullRequestBuilder/14894/


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---