You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "P. Taylor Goetz (JIRA)" <ji...@apache.org> on 2016/12/01 18:44:58 UTC

[jira] [Resolved] (STORM-2229) KafkaSpout does not resend failed tuples

     [ https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

P. Taylor Goetz resolved STORM-2229.
------------------------------------
    Resolution: Duplicate

Resolving as duplicate. STORM-2077 updated to reflect proper component.

> KafkaSpout does not resend failed tuples
> ----------------------------------------
>
>                 Key: STORM-2229
>                 URL: https://issues.apache.org/jira/browse/STORM-2229
>             Project: Apache Storm
>          Issue Type: Bug
>          Components: storm-kafka-client
>    Affects Versions: 1.0.0, 1.0.1, 1.0.2
>            Reporter: Matthias Klein
>
> When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.
> Apparent reason:
> {code}
> public class KafkaSpout<K, V> extends BaseRichSpout {
> //...
> private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
>             } else {
>                 kafkaConsumer.seekToEnd(toArrayList(rtp));    // Seek to last committed offset <== Does seek to end of partition
>             }
>         }
>     }
> {code}
> The code seeks to the end of the partition instead of seeking to the first uncommited offset.
> Preliminary fix (worked for me, but needs to be checked by an expert)
> {code}
>     private void doSeekRetriableTopicPartitions() {
>         final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
>         for (TopicPartition rtp : retriableTopicPartitions) {
>             final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
>             if (offsetAndMeta != null) {
>                 kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
>             } else {
>                 OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
>                 if(committed == null) {
>                     // No offsets commited yet for this partition - start from beginning 
>                     kafkaConsumer.seekToBeginning(toArrayList(rtp));
>                 } else {
>                    // Seek to first uncommitted offset
>                     kafkaConsumer.seek(rtp, committed.offset() + 1);
>                 }
>             }
>         }
>     }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)