You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Luke Forehand (JIRA)" <ji...@apache.org> on 2014/05/23 17:51:02 UTC

[jira] [Comment Edited] (STORM-162) Load Balancing Shuffle Grouping

    [ https://issues.apache.org/jira/browse/STORM-162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14007270#comment-14007270 ] 

Luke Forehand edited comment on STORM-162 at 5/23/14 3:49 PM:
--------------------------------------------------------------

We would love to see this feature implemented.  We have certain data in our stream that can trigger spikes in execution latency for a bolt.  This data cannot be separated into its own stream beforehand, so some bolt instances are hot spotting and queuing up.  This requires us to increase tuple timeout to allow the queued up tuples to reach the end of the topology before being replayed.  This is bad in failure scenarios because it takes forever for throughput to recover when workers are killed, as the lost messages have to hit the super high timeout before being replayed.


was (Author: lukeforehand):
We would love to see this feature implemented.  We have certain data in our stream that can trigger spikes in execution latency for a bolt.  This data cannot be separated into its own stream beforehand, so some bolt instances are hot spotting and queuing up.  This requires us to increase tuple timeout to allow the queued up tuples to reach the end of the topology before being replayed.  This is bad in failure scenarios because it takes forever for throughput to recover when workers are killed.

> Load Balancing Shuffle Grouping
> -------------------------------
>
>                 Key: STORM-162
>                 URL: https://issues.apache.org/jira/browse/STORM-162
>             Project: Apache Storm (Incubating)
>          Issue Type: Wish
>            Reporter: James Xu
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/571
> Hey @nathanmarz,
> I think that the current shuffle grouping is creating very obvious hot-spots in load on hosts here at Twitter. The reason is that randomized message distribution to the workers is susceptible to the balls and bins problem:
> http://pages.cs.wisc.edu/~shuchi/courses/787-F07/scribe-notes/lecture07.pdf
> the odds that some particular queue gets bogged down when you're assigning tasks randomly is high. You can solve this problem with a load-aware shuffle grouping -- when shuffling, prefer tasks with lower load.
> What would it take to implement this feature?
> ----------
> sritchie: Looks like Rap Genius was heavily affected when Heroku started running a "shuffle grouping" on tasks to its dynos:
> http://rapgenius.com/James-somers-herokus-ugly-secret-lyrics
> 50x performance degradation over a more intelligent load-balancing scheme that only sent tasks to non-busy dynos. Seems very relevant to Storm.
> ----------
> nathanmarz: It's doing randomized round robin, not fully random distribution. So every downstream task gets the same number of messages. But yes, I agree that this would be a great feature. Basically what this requires is making stats of downstream tasks available to the stream grouping code. The best way to implement this would be:
> Implement a broadcast message type in the networking code, so that one can efficiently send a large object to all tasks in a worker (rather than having to send N copies of that large message)
> Have a single executor in every topology that polls nimbus for accumulated stats once per minute and then broadcasts that information to all tasks in all workers
> Wire up the task code to pass that information along from the task to the outgoing stream groupings for that task (and adding appropriate methods to the CustomStreamGrouping interface to receive the stats info)
> ----------
> sorenmacbeth: @nathanmarz @sritchie Did any progress ever get made on this? Is the description above still relevant to Storm 0.9.0. We are getting bitten by this problem and would love to see something like this implemented.



--
This message was sent by Atlassian JIRA
(v6.2#6252)