You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Stig Rohde Døssing (JIRA)" <ji...@apache.org> on 2016/09/20 20:30:22 UTC

[jira] [Comment Edited] (STORM-2106) Storm Kafka Client is paused while failed tuples are replayed

    [ https://issues.apache.org/jira/browse/STORM-2106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15507662#comment-15507662 ] 

Stig Rohde Døssing edited comment on STORM-2106 at 9/20/16 8:29 PM:
--------------------------------------------------------------------

[~jfenc91] I don't think it works that way. doSeekRetriableTopicPartitions only seeks to the committed offset if retryService.retriableTopicPartitions returns x's TopicPartition (since the for-loop otherwise doesn't include x's TopicPartition), which it only does if there are failed tuples ready for retry on that TopicPartition. When x is ready for retry, retryService.retriableTopicPartitions will return x's TopicPartition, the consumer seeks to x and the consumer is polled. x should then be emitted. When x is emitted, emitTupleIfNotEmitted removes the messageId from retryService, which should prevent retryService.retriableTopicPartitions from returning x's TopicPartition until x (or some other message on x's TopicPartition) fails again.

While x is still processing the spout shouldn't seek back to x (and doesn't as far as I can tell). Is it possible that you were limited by something else, like maxSpoutPending, retrying a large number of tuples or partition reassignments/worker restarts? 

See https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 and https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java#L164

edit: I suppose if you have a failed tuple reasonably far back in the tuple stream with a bunch of acked tuples you need to skip, you might end up wasting a bunch of time waiting for Storm to skip the acked tuples because Storm waits 1 ms between calls to nextTuple by default if nothing is emitted. Maybe that was why calling emitTupleIfNotEmitted in a loop made a difference (assuming it made a difference)?


was (Author: srdo):
[~jfenc91] I don't think it works that way. doSeekRetriableTopicPartitions only seeks to the committed offset if retryService.retriableTopicPartitions returns x's TopicPartition (since the for-loop otherwise doesn't include x's TopicPartition), which it only does if there are failed tuples ready for retry on that TopicPartition. When x is ready for retry, retryService.retriableTopicPartitions will return x's TopicPartition, the consumer seeks to x and the consumer is polled. x should then be emitted. When x is emitted, emitTupleIfNotEmitted removes the messageId from retryService, which should prevent retryService.retriableTopicPartitions from returning x's TopicPartition until x (or some other message on x's TopicPartition) fails again.

While x is still processing the spout shouldn't seek back to x (and doesn't as far as I can tell). Is it possible that you were limited by something else, like maxSpoutPending, retrying a large number of tuples or partition reassignments/worker restarts? 

See https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java#L294 and https://github.com/apache/storm/blob/master/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java#L164

> Storm Kafka Client is paused while failed tuples are replayed
> -------------------------------------------------------------
>
>                 Key: STORM-2106
>                 URL: https://issues.apache.org/jira/browse/STORM-2106
>             Project: Apache Storm
>          Issue Type: Bug
>            Reporter: Jeff Fenchel
>
> With the changes in STORM-2087, the kafka 10 spout will limited to emitting tuples that are within the poll() size for kafka. This means that if the first tuple in a batch from kafka is failed, the spout will not emit more than the size of the batch from kafka until the tuple is either processed successfully or given up on. This behavior is exacerbated by the exponential backoff retry policy.  
> There probably needs to be bookkeeping for the next emittable offset.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)