You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Anis Nasir (JIRA)" <ji...@apache.org> on 2017/02/16 05:06:42 UTC

[jira] [Commented] (STORM-162) Load Balancing Shuffle Grouping

    [ https://issues.apache.org/jira/browse/STORM-162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15869185#comment-15869185 ] 

Anis Nasir commented on STORM-162:
----------------------------------

Dear all,

I started looking into this issue and want to propose a new grouping scheme that is capable of handling both the skewness in the input stream and the heterogeneity in the cluster.

The main idea is to use a modified version of consistent hashing, which runs on each upstream operator. The upstream operator maintains the consistent hashing ring and use it to map the tuples to downstream operators. 

Additionally, each downstream operator is allowed to send two messages to the upstream operator: "increase load" or "decrease load". Based on these parameters, the upstream operator can add or remove the virtual servers in the ring for each instance. This translates into forwarding less keys to overloaded machines and more keys to powerful machines.

Currently, I am trying to run this algorithm using the q-length that is available at the upstream operators. However, I believe this scheme can further be modified to incorporate other parameters like service time, CPU utilization and available memory.

Does it makes sense ?

Cheers,
Anis




> Load Balancing Shuffle Grouping
> -------------------------------
>
>                 Key: STORM-162
>                 URL: https://issues.apache.org/jira/browse/STORM-162
>             Project: Apache Storm
>          Issue Type: Wish
>          Components: storm-core
>            Reporter: James Xu
>            Assignee: Robert Joseph Evans
>            Priority: Minor
>             Fix For: 1.0.0
>
>
> https://github.com/nathanmarz/storm/issues/571
> Hey @nathanmarz,
> I think that the current shuffle grouping is creating very obvious hot-spots in load on hosts here at Twitter. The reason is that randomized message distribution to the workers is susceptible to the balls and bins problem:
> http://pages.cs.wisc.edu/~shuchi/courses/787-F07/scribe-notes/lecture07.pdf
> the odds that some particular queue gets bogged down when you're assigning tasks randomly is high. You can solve this problem with a load-aware shuffle grouping -- when shuffling, prefer tasks with lower load.
> What would it take to implement this feature?
> ----------
> sritchie: Looks like Rap Genius was heavily affected when Heroku started running a "shuffle grouping" on tasks to its dynos:
> http://rapgenius.com/James-somers-herokus-ugly-secret-lyrics
> 50x performance degradation over a more intelligent load-balancing scheme that only sent tasks to non-busy dynos. Seems very relevant to Storm.
> ----------
> nathanmarz: It's doing randomized round robin, not fully random distribution. So every downstream task gets the same number of messages. But yes, I agree that this would be a great feature. Basically what this requires is making stats of downstream tasks available to the stream grouping code. The best way to implement this would be:
> Implement a broadcast message type in the networking code, so that one can efficiently send a large object to all tasks in a worker (rather than having to send N copies of that large message)
> Have a single executor in every topology that polls nimbus for accumulated stats once per minute and then broadcasts that information to all tasks in all workers
> Wire up the task code to pass that information along from the task to the outgoing stream groupings for that task (and adding appropriate methods to the CustomStreamGrouping interface to receive the stats info)
> ----------
> sorenmacbeth: @nathanmarz @sritchie Did any progress ever get made on this? Is the description above still relevant to Storm 0.9.0. We are getting bitten by this problem and would love to see something like this implemented.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)