You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Robert Joseph Evans (JIRA)" <ji...@apache.org> on 2015/01/29 16:10:35 UTC

[jira] [Commented] (STORM-162) Load Balancing Shuffle Grouping

    [ https://issues.apache.org/jira/browse/STORM-162?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14296981#comment-14296981 ] 

Robert Joseph Evans commented on STORM-162:
-------------------------------------------

I have started working on a patch for this.  Right now I have the plumbing in place to send a "load" measurement for each bolt in the worker to a worker that is connected to it.  This is sent through the messaging layer, so there is no global state, but a local state for what each worker is connected to. I have an alternative version of the shuffle grouping that uses psudo random numbers to pick a downstream bolt.  The probability of picking the downstream bolt is more or less inversely proportional to the load on the bolt.  All of that seems to be working fairly well, but where I am running into issues is what should the "load" truly be.  Currently calculate I as 

{code}
boltLoad = max(localBoltLoad, messagingClientLoad);
localBoltLoad = boltInputQueuePopulation/boltInputQueueCapacity;
messagingClientLoad = min(1024,clientMessagesPending)/1024;
{code}


All of this seems to work OK, but I am a bit nervous about spikes in load, and also in overall latency.  The goal behind this is to be able to deal with heterogeneous hardware and heterogeneous load. With acking enabled and a truly the current shuffle implementation the total throughput of the topology is limited by the slowest path through it, and I would like to decouple them.  If we are polling the instantaneous load and there is a spike that we quickly recover from the bolt we be slowed down for the entire polling period.  Also if we are going off of queue length and we reach a stable point, inherently that stable point will have some number more entries in the queue for the slower bolts, so any tuple that is unlucky enough to be assigned to a slow bolt will take a double latency hit, it will not only have to be processed by a slower bolt, but it will also have to wait in a longer line to do it. Do others think the capacity metric would be a better measure, perhaps a combination of the two?

I think for now I am just going to try and get the framework in place, and a shuffle implementation that can be configured to the original behavior.  Then after that we can play around with it and see how it performs.  But I would like feedback on it if others have good ideas.  Especially [~azaroth] as he seems to have done a great deal of research into this area already.

> Load Balancing Shuffle Grouping
> -------------------------------
>
>                 Key: STORM-162
>                 URL: https://issues.apache.org/jira/browse/STORM-162
>             Project: Apache Storm
>          Issue Type: Wish
>            Reporter: James Xu
>            Assignee: Robert Joseph Evans
>            Priority: Minor
>
> https://github.com/nathanmarz/storm/issues/571
> Hey @nathanmarz,
> I think that the current shuffle grouping is creating very obvious hot-spots in load on hosts here at Twitter. The reason is that randomized message distribution to the workers is susceptible to the balls and bins problem:
> http://pages.cs.wisc.edu/~shuchi/courses/787-F07/scribe-notes/lecture07.pdf
> the odds that some particular queue gets bogged down when you're assigning tasks randomly is high. You can solve this problem with a load-aware shuffle grouping -- when shuffling, prefer tasks with lower load.
> What would it take to implement this feature?
> ----------
> sritchie: Looks like Rap Genius was heavily affected when Heroku started running a "shuffle grouping" on tasks to its dynos:
> http://rapgenius.com/James-somers-herokus-ugly-secret-lyrics
> 50x performance degradation over a more intelligent load-balancing scheme that only sent tasks to non-busy dynos. Seems very relevant to Storm.
> ----------
> nathanmarz: It's doing randomized round robin, not fully random distribution. So every downstream task gets the same number of messages. But yes, I agree that this would be a great feature. Basically what this requires is making stats of downstream tasks available to the stream grouping code. The best way to implement this would be:
> Implement a broadcast message type in the networking code, so that one can efficiently send a large object to all tasks in a worker (rather than having to send N copies of that large message)
> Have a single executor in every topology that polls nimbus for accumulated stats once per minute and then broadcasts that information to all tasks in all workers
> Wire up the task code to pass that information along from the task to the outgoing stream groupings for that task (and adding appropriate methods to the CustomStreamGrouping interface to receive the stats info)
> ----------
> sorenmacbeth: @nathanmarz @sritchie Did any progress ever get made on this? Is the description above still relevant to Storm 0.9.0. We are getting bitten by this problem and would love to see something like this implemented.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)