You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@storm.apache.org by "Rick Kellogg (JIRA)" <ji...@apache.org> on 2015/10/09 02:17:28 UTC

[jira] [Updated] (STORM-31) Auto-tune max spout pending

     [ https://issues.apache.org/jira/browse/STORM-31?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Rick Kellogg updated STORM-31:
------------------------------
    Component/s: storm-core

> Auto-tune max spout pending
> ---------------------------
>
>                 Key: STORM-31
>                 URL: https://issues.apache.org/jira/browse/STORM-31
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-core
>            Reporter: James Xu
>              Labels: HighPriority
>
> https://github.com/nathanmarz/storm/issues/385
> It should be relatively easy to have spouts automatically tune max spout pending so that topologies can handle high throughputs with minimal tuning. A spout should look at the average complete latency over the last 10 minutes and compare that to the message timeout. If it's significantly lower, it should increase pending. If it's close, it should decrease.
> --------------------------------------------------------------------------------------------------
> @jasonjckn: So as you know there's no way to count # of replays in storm, or more generally, there's no storm spout concept of consumption progress. There is # of acked tuples, but this doesn't differentiate between replays. While I think the design choice is actually appropriate when you think how consumption means different things in kafka, the variety of different spouts that can exist, etc. This does lead to problems with any auto tune max spout pending algorithm I could possibly devise based on the current executor stats. For example with kafka a particular offset references a block of messages compressed together, not individual messages, and when a particular tuple fails this will lead to that entire block of tuples being replayed, and possibly other blocks that came afterwords. Now imagine a kafka block of 1000 tuples, and there's a 1% chance of a failed tuple, so this can lead to zero progress, because every single kafka block has at least 1 tuple that failed. However this also has a nice ratio of # acks / # emits, because all of the acks are replayes. Contrasting this where the user logic does some kind of batching, and database batch updates, this tends to have the property where entire batches succeed or fail (This is how trident works), so if 4 out of 5 batches were to succeed, and a batch includes 1 kafka block from each partition, then you're making a lot of progress, but your # acks / # emits ratio is 4/5. So lower emit ratio, but a lot of progress is being made.
> So here's an idea to solve this:
> Expose setMaxSpoutPending to spout implementations so they can call it whenever they want. Then the user would enable auto tune max spout pending as a KafkaSpout option or KestrelSpout Option (instead of topology config).
> Then you would write a generic max spout pending algorithm which would be a thermostat on 'consumption throughput'. 'amount consumed' is an abstract concept, and it's an input into this algorithm. 'amount consumed' is assumed to be a monotonically increasing number and to indicate durable consumption occurring. So in the kafka implementation you would plugin this algorithm, and 'amount consumed' would be defined as the oldest offset to a kafka block that's been acked.
> I think moving this logic into the spout implementation a lot of sense when consumption means different things depending on the whatever is backing the spout. I thought about introducing tuple replays as a concept in storm, but there's no easy way to track this at the tuple level in kafka, just the kafka block level. Also in trident this makes a lot of sense because the # of pending tuples is actually the number of parallel batches. So if you set max spout pending to 5, then that's 5 parallel batches. By allowing the trident master coordinator spout to set it's own max spout pending it could do something smart for optimizing the number of parallel batches.
> As a side note, a kafka block may contain any number of messages, and the kafka offsets jump in steps of block size (block size is not fixed either), it's basically just an offset on disk, so this would lead to the auto tune max spout pending maximizing not on message throughput, but rather compressed data throughput. I don't think this matters one way or the other, you could always do message throughput by counting the number of messages in each block, but that's a more complex implementation, i'd just do the simpler one first (the oldest unacked kafka offset).



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)