You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2017/08/26 21:01:03 UTC

[jira] [Updated] (STORM-2549) The fix for STORM-2343 is incomplete, and the spout can still get stuck on failed tuples

     [ https://issues.apache.org/jira/browse/STORM-2549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Stig Rohde Døssing updated STORM-2549:
--------------------------------------
    Component/s: storm-kafka-client

> The fix for STORM-2343 is incomplete, and the spout can still get stuck on failed tuples
> ----------------------------------------------------------------------------------------
>
>                 Key: STORM-2549
>                 URL: https://issues.apache.org/jira/browse/STORM-2549
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 2.0.0, 1.1.0
>            Reporter: Stig Rohde Døssing
>            Assignee: Stig Rohde Døssing
>          Time Spent: 2h
>  Remaining Estimate: 0h
>
> Example:
> Say maxUncommittedOffsets is 10, maxPollRecords is 5, and the committedOffset is 0.
> The spout will initially emit up to offset 10, because it is allowed to poll until numNonRetriableTuples is >= maxUncommittedOffsets
> The spout will be allowed to emit another 5 tuples if offset 10 fails, so if that happens, offsets 10-14 will get emitted. If offset 1 fails and 2-14 get acked, the spout gets stuck because it will count the "extra tuples" 11-14 in numNonRetriableTuples.
> An similar case is the one where maxPollRecords doesn't divide maxUncommittedOffsets evenly. If it were 3 in the example above, the spout might just immediately emit offsets 1-12. If 2-12 get acked, offset 1 cannot be reemitted.
> The proposed solution is the following:
> * Enforce maxUncommittedOffsets on a per partition basis (i.e. actual limit will be multiplied by the number of partitions) by always allowing poll for retriable tuples that are within maxUncommittedOffsets tuples of the committed offset. Pause any non-retriable partitions if the partition has passed the maxUncommittedOffsets limit, and some other partition is polling for retries while also at the maxUncommittedOffsets limit. 
> Example of this functionality:
> MaxUncommittedOffsets is 100
> MaxPollRecords is 10
> Committed offset for partition 0 and 1 is 0.
> Partition 0 has emitted 0
> Partition 1 has emitted 0...95, 97, 99, 101, 103 (some offsets compacted away)
> Partition 1, message 99 is retriable
> We check that message 99 is within 100 emitted tuples of offset 0 (it is the 97th tuple after offset 0, so it is)
> We do not pause partition 0 because that partition isn't at the maxUncommittedOffsets limit.
> Seek to offset 99 on partition 1 and poll
> We get back offset 99, 101, 103 and potentially 7 new tuples. Say the lowest of these is at offset 104.
> The spout emits offset 99, filters out 101 and 103 because they were already emitted, and emits the 7 new tuples.
> If offset 104 (or later) become retriable, they are not retried until the committed offset moves. This is because offset 104 is the 101st tuple emitted after offset 0, so it isn't allowed to retry until the committed offset moves.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)