You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Roshan Naik (JIRA)" <ji...@apache.org> on 2019/01/17 09:13:00 UTC

[jira] [Commented] (STORM-3314) Acker redesign

    [ https://issues.apache.org/jira/browse/STORM-3314?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16744813#comment-16744813 ] 

Roshan Naik commented on STORM-3314:
------------------------------------

_Quoting thoughts expressed by_ [~kabhwan] _in response to above initial idea :_

 

*Essential idea:* Try to avoid sending ack tuple to another workers, given that it incurs serde and network cost. If we could get rid of field grouping it may be better.

Roshan brought a great idea about it, which is in line with my essential idea, so there may be a trade-off between Roshan’s and mine. My idea is inspired by Roshan’s idea and a bit modified to adopt nice things of his idea.

 

*Summary:* Build partial tuple tree into Acker in each worker, and let Acker in worker handles its own partial tuple tree. Note that we don’t change the basic idea of acking mechanism.

 

*Detail:*
 * When tuple is emitted from spout, it sends ACK_INIT message to Acker within worker.
 ** Same as current.

 
 * The logic for emitting tuple in Bolt doesn’t change.
 ** We may want to include acker ID to the tuple to send acknowledge to.
 ** Otherwise, we also could compute (and cache) it from bolt side (based on source task ID) to not grow size of each tuple.
 * When worker receives tuples from outside of worker, it also sends ACK_INIT message to Acker within worker.
 ** Acker stores additional information (acker task ID for source task’s worker, tuple ID of source tuple, anchors, etc.) to “send back ACK to source” when partial tuple tree is completed.
 ** The information may be a bit different between referring spout and referring another acker.
 * Whenever tuple is acknowledged in Bolt, ACK message is sent to the acker within worker.
 * When acker found tuple tree is completed, it sends back ACK to source acker based on stored information.
 ** If the source is Spout, just do as same as what we are going.

 

*Expected benefits:*
 * ACK tuples between hops within worker will not be sent to another workers, hence avoiding cost of serde and network transfer.
 * No field grouping.
 * It follows the path how tuples are flowing, which could be optimized by grouper implementation.

 

*Considerations:*
 * Each worker has to have acker task for each, which is a new requirement even it is by default for now.
 ** We could (and should) still turn off acker when needed, but configuration for acker needs to be changed to turn on/off, not acker task count.
 * Total count of ack tuples is not changed.
 ** Maybe we could avoid sending ACK tuples within worker and just calculate and update what acker is doing on the fly (in executor thread) if it gives more benefits.
 * A bit more memory is needed given that we maintain tuple tree for each worker (linear to worker count tuple tree has to flow), and we need additional information for sending back ACK to another acker.
 * Need to consider how FAIL tuple will work, and how RESET_TIMEOUT tuple will work.
 ** They will be going to be headache things to consider from both mine and Roshan’s.
 ** I’m still not sure how tuple timeout works with backpressure, even we still need this to get rid of old things in rotating tree.

 

*Differences between Roshan’s idea:*
 * We don’t add ACK tuples to the source task’s queue, hence keep bolt’s task and acker’s task separate.
 ** Two perspectives: queue, (executor) thread.
 ** It will be less coupled with backpressure. IMHO, ACK tuples and metrics tuples should flow regardless of backpressure ongoing.
 ** I think leaving it to separate could keep opening possibilities to apply more optimizations on acker, like having separate channel for ACK tuples and metrics tuples.
 * It uses less memory if there’re some tasks in worker: it is just what acker excels.
 * Less ACK tuple flowing hops when ACK tuples are flowing back, given that it only deals with ackers. (Yes, more hops when acking within worker though.)
 * We just need to handle rotating tree only one place (acker) for each worker, compared to each task.
 * Complicated than Roshan’s idea: Roshan’s idea is just clearly intuitive, and I can’t imagine easier solutions.

 

 

> Acker redesign
> --------------
>
>                 Key: STORM-3314
>                 URL: https://issues.apache.org/jira/browse/STORM-3314
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-client
>            Reporter: Roshan Naik
>            Priority: Major
>
> *Context:* The ACKing mechanism has come focus as one of the next major bottlenecks to address. The strategy to timeout and replay tuples has issues discussed in STORM-2359
> *Basic idea:* Every bolt will send an ACK msg to its upstream spout/bolt once the tuples it emitted have been *fully processed* by downstream bolts.
> *Determining "fully processed”* : For every incoming (parent) tuple, a bolt can emit 0 or more “child” tuples. the Parent tuple is considered fully processed once a bolt receives ACKs for all the child emits (if any). This basic idea cascades all the way back up to the spout that emitted the root of the tuple tree.
> This means that, when a bolt is finished with all the child emits and it calls ack() no ACK message will be generated (unless there were 0 child emits). The ack() marks the completion of all child emits for a parent tuple. The bolt will emit an ACK to its upstream component once all the ACKs from downstream components have been received.
> *Operational changes:* The existing spouts and bolts don’t need any change. The bolt executor will need to process incoming acks from downstream bolts and send an ACK to its upstream component as needed. In the case of 0 child emits, ack() itself could immediately send the ACK to the upstream component. Field grouping is not applied to ACK messages.
> Total ACK messages: The spout output collector will no longer send an ACK-init message to the ACKer bolt. Other than this, the total number of emitted ACK messages does not change. Instead of the ACKs going to an ACKer bolt, they get spread out among the existing bolts. It appears that this mode may reduce some of the inter-worker traffic of ACK messages.
> *Memory use:* If we use the existing XOR logic from ACKer bolt, we need about 20 bytes per outstanding tuple-tree at each bolt. Assuming an average of say 50k outstanding tuples at each level, we have 50k*20bytes = 1MB per bolt instance. There may be room to do something better than XOR, since we only need to track one level of outstanding emits at each bolt.
> *Replay:* [needs more thinking] One option is to send REPLAY or TIMEOUT msgs upstream. Policy of when to emit them needs more thought. Good to avoid Timeouts/replays of inflight tuples under backpressure since this will lead to "event tsunami" at the worst possible time. Ideally, if possible, replay should be avoided unless tuples have been dropped. Would be nice to avoid sending TIMEOUT_RESET msgs upstream when under backpressure ... since they are likely to face backpressure as well.
> On receiving an ACKs or REPLAYs from downstream components, a bolt needs to clears the corresponding 20 bytes tracking info.
>  
> *Concerns:* ACK tuples traversing upstream means it takes longer to get back to Spout.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)