You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by syedhashmi <gi...@git.apache.org> on 2014/05/26 03:04:56 UTC

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

GitHub user syedhashmi opened a pull request:

    https://github.com/apache/spark/pull/876

    SPARK[1784]: Adding a balancedPartitioner

    This change adds a balanced partitioner to existing partitioners. The new partitioner uses round robin strategy to allocate keys to partitions so that we end up with balanced partitions for a RDD.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/syedhashmi/spark master

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/spark/pull/876.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #876
    
----
commit 4ca94cc155aea4be36505d5f37d037e209078196
Author: Syed Hashmi <sh...@cloudera.com>
Date:   2014-05-09T23:32:32Z

    [SPARK-1784] Add a new partitioner
    
    This change adds a new partitioner which allows users
    to specify # of keys per partition.

commit 66680150aa705bf301f79367647e671cb5ef9e21
Author: CodingCat <zh...@gmail.com>
Date:   2014-05-10T04:50:23Z

    SPARK-1686: keep schedule() calling in the main thread
    
    https://issues.apache.org/jira/browse/SPARK-1686
    
    moved from original JIRA (by @markhamstra):
    
    In deploy.master.Master, the completeRecovery method is the last thing to be called when a standalone Master is recovering from failure. It is responsible for resetting some state, relaunching drivers, and eventually resuming its scheduling duties.
    
    There are currently four places in Master.scala where completeRecovery is called. Three of them are from within the actor's receive method, and aren't problems. The last starts from within receive when the ElectedLeader message is received, but the actual completeRecovery() call is made from the Akka scheduler. That means that it will execute on a different scheduler thread, and Master itself will end up running (i.e., schedule() ) from that Akka scheduler thread.
    
    In this PR, I added a new master message TriggerSchedule to trigger the "local" call of schedule() in the scheduler thread
    
    Author: CodingCat <zh...@gmail.com>
    
    Closes #639 from CodingCat/SPARK-1686 and squashes the following commits:
    
    81bb4ca [CodingCat] rename variable
    69e0a2a [CodingCat] style fix
    36a2ac0 [CodingCat] address Aaron's comments
    ec9b7bb [CodingCat] address the comments
    02b37ca [CodingCat] keep schedule() calling in the main thread

commit fd36542c5dd2eaf8657e0d6aff65ab2365beef56
Author: Syed Hashmi <sh...@cloudera.com>
Date:   2014-05-26T00:55:17Z

    [SPARK-1784] Add a balanced partitioner
    
    This partitioner uses round robin allocation strategy for keys
    to end up with balanced partitions for a RDD.

commit 4354836bda0f8f3c5286fa244ea6a655b4cda386
Author: Syed Hashmi <sh...@cloudera.com>
Date:   2014-05-26T01:02:19Z

    Revert "SPARK-1686: keep schedule() calling in the main thread"
    
    This reverts commit 66680150aa705bf301f79367647e671cb5ef9e21.

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by syedhashmi <gi...@git.apache.org>.
Github user syedhashmi commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44318141
  
    @pwendell : You are right ... your patch addresses this scenario. Does it make sense to expose this functionality through a partitioner as that is the intuitive way for most folks or do you think that will be duplication of logic?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44226720
  
    @syedhashmi If you just want to shuffle stuff around randomly (i.e. you lose affinity of keys to specific partitions) then isn't it sufficient to just call `repartition` on the RDD? I added some code recently that does round robin balancing when calling `repartition`:
    
    https://github.com/apache/spark/pull/727


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by pwendell <gi...@git.apache.org>.
Github user pwendell commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44226761
  
    Here is the specific code:
    https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L332


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by aarondav <gi...@git.apache.org>.
Github user aarondav commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44151265
  
    The current contract of `Partitioner` (though it's not documented, apparently...) is that it is expected to be idempotent and that if two keys are equivalent, they are assigned to the same partition. [PairRDDFunctions#lookup](https://github.com/ash211/spark/blob/sortby/core/src/main/scala/org/apache/spark/rdd/PairRDDFunctions.scala?pr=%2Fapache%2Fspark%2Fpull%2F369#L558) makes this assumption, for instance.
    
    It turns out this sort of balanced partitioning is useful, however, and we have encoded it explicitly within [RDD#coalesce()](https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/rdd/RDD.scala#L328). The semantics here match Spark's assumptions about partitioners -- i.e., the resultant RDD has no Partitioner, so no assumption can be made about the colocation of keys in order to do efficient lookups/groupBys/reduceByKeys.
    
    Would this sort of manual repartitioning suit your use-case? Otherwise it would require a rather significant overhaul to Spark's Partitioner semantics.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by mateiz <gi...@git.apache.org>.
Github user mateiz commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44497883
  
    This functionality doesn't fit the definition of a Partitioner as used in Spark (which requires it to consistently return the same partition for each key), so it would be confusing to expose it as such. The `repartition` and `coalesce` methods are the right way to do it.
    
    In particular, Partitioners are also used to decide whether you can optimize joins and lookups based on a key's partition. This would break that behavior.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by syedhashmi <gi...@git.apache.org>.
Github user syedhashmi commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44152706
  
    You are right there are routines which make this assumption but this is becoming a pain point for users as they end up with lopsided partitions and especially, if their dataset is huge, some larger partitions become bottleneck and extend the tail of processing time. This partitioner is explicitly targeting such scenarios. If agree upon general idea of partitioner itself, I can add checks to functions assuming Hash or Range partitioning behavior to classify Balanced partitioner as general case. User ends up with exactly balanced partitions and sacrifices a bit at lookup type routines.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by syedhashmi <gi...@git.apache.org>.
Github user syedhashmi closed the pull request at:

    https://github.com/apache/spark/pull/876


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by AmplabJenkins <gi...@git.apache.org>.
Github user AmplabJenkins commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44150814
  
    Can one of the admins verify this patch?


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---

[GitHub] spark pull request: SPARK[1784]: Adding a balancedPartitioner

Posted by srowen <gi...@git.apache.org>.
Github user srowen commented on the pull request:

    https://github.com/apache/spark/pull/876#issuecomment-44169636
  
    Yes I had a similar question. This would calculate different partitions for the same key when called from different places and times, and I imagine that causes several methods to fail. For example, what about joining two RDDs both using this partitioner (and with multiple partitions) -- anything that creates a shuffle dependency among two pair RDDs. Surely the different instances of the depended-upon RDD's partitioner will return different partitions for keys and get the answer wrong? 
    
    I'm thinking of any time the partitioner instance is copied around -- it will copy state but then its state, which is essential to its answers, varies. Maybe someone more knowledgeable than I can confirm an easy way to test this, or that I really misunderstand and this never happens.
    
    `HashPartitioner` expects to give fairly balanced partitions already, unless the value's hash function is bad. That is better fixed in the hash function itself.
    
    I had thought the problem was more often in pair RDDs where one key has a lot of values, and operations that group by key create imbalanced partitions? That's not the question here right, that wouldn't be helped by this.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastructure@apache.org or file a JIRA ticket
with INFRA.
---