You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2018/12/17 15:06:00 UTC

[jira] [Commented] (STORM-2359) Revising Message Timeouts

    [ https://issues.apache.org/jira/browse/STORM-2359?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16723055#comment-16723055 ] 

Stig Rohde Døssing commented on STORM-2359:
-------------------------------------------

I've been taking a look at this, and have a proposal for how we could move the timeout entirely to the acker.

I'm going to assume in the following that tuples sent from one task to another are received in the order they were sent, ignoring message loss. I think this is the case, but please correct me if it isn't.
h6. The problem

Both the acker and spout executor currently have rotating pending maps, which time out tuples based on received ticks. The acker pending map just discards the tuple, while the spout pending map fails them if they get rotated out. The reason this currently happens in both acker and spout is that we need to ensure that the spout fails tuples if they time out, even in the presence of message loss.

If we were to move timeouts entirely to the acker, there would be a potential for "lost" tuples (ones that end up not being reemitted) if messages between the acker and spout get lost.
 * The spout may emit a new tuple, and try to notify the acker about it. If this message is lost, the acker doesn't know about the tuple, and thus can't time it out.
 * The acker may send an ack or fail message back to the spout, which may get lost. Since the acker currently deletes acked/failed messages as soon as they are acked/failed, this would prevent the message from being replayed.

h6. Suggested solution

We can move the timeout logic to the acker, and it will work out of the box as long as there are no lost messages. I think we can account for lost messages by having the spout executor periodically update its view of pending tuples based on the state in the acker.

Say the spout pending root ids are A, and the acker pending root ids are B. The spout can periodically (e.g. once per second) send to each acker the root ids in A. The acker should respond to this tuple by sending back A - B (it can optionally also delete anything in B that is not in A). The spout can safely fail any tuple in A - B which is also in the spout pending when the sync tuple response is received.
 * If a tuple is in A and B, it is still pending, and shouldn't be removed. Returning only A - B ensures pending tuples remain.
 * If a tuple is in A but not B, the ack_init message was lost, the acker may have crashed and restarted or the tuple has simply been acked or failed.
 ** If the ack_init was lost, or the acker crashed, then the spout should replay the message. Since A - B contains the tuple, the spout will fail it when it receives the response.
 ** If the tuple was acked, then the acker is guaranteed to have sent the ack message on to the spout before handling the sync tuple. Due to message ordering, the ack will be processed before the sync tuple response, making the presence of the tuple in A - B irrelevant.
 * If a tuple is not in A but in B, then the spout may have crashed. The acker can optionally just discard the pending tuple without notifying the spout, since notifying the spout about the state of a tuple emitted by a different instance is a no op.

h6. Benefit

Moving the timeout logic to the acker makes it much cheaper to reset tuple timeouts, since the spout no longer needs to be notified directly. We could likely make the acker reset the timeout automatically any time it receives an ack.

 

Depending on overhead, we might be able to add an option to let Storm reset timeouts for tuples that are still being actively processed (i.e. received by a bolt but not yet acked/failed). This would be beneficial to avoid the event tsunami problem described in the linked design doc. It could help prevent the type of degenerate case described at https://issues.apache.org/jira/browse/STORM-2359?focusedCommentId=16043409&page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-16043409.
h6. Cost

There is a bit of extra overhead to doing this. The spout needs to keep track of which acker tasks are responsible for which root ids. There is also the (fairly minor) overhead of sending the sync tuples back and forth. In terms of processing reliability, a tuple the acker considers failed/timed out will be failed in the spout once one of the sync tuples make it through.

> Revising Message Timeouts
> -------------------------
>
>                 Key: STORM-2359
>                 URL: https://issues.apache.org/jira/browse/STORM-2359
>             Project: Apache Storm
>          Issue Type: Sub-task
>          Components: storm-core
>    Affects Versions: 2.0.0
>            Reporter: Roshan Naik
>            Priority: Major
>
> A revised strategy for message timeouts is proposed here.
> Design Doc:
>  https://docs.google.com/document/d/1am1kO7Wmf17U_Vz5_uyBB2OuSsc4TZQWRvbRhX52n5w/edit?usp=sharing



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)