You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Matthias Klein (JIRA)" <ji...@apache.org> on 2016/12/01 07:09:58 UTC

[jira] [Created] (STORM-2229) KafkaSpout does not resend failed tuples

Matthias Klein created STORM-2229:
-------------------------------------

             Summary: KafkaSpout does not resend failed tuples
                 Key: STORM-2229
                 URL: https://issues.apache.org/jira/browse/STORM-2229
             Project: Apache Storm
          Issue Type: Bug
          Components: storm-kafka-client
    Affects Versions: 1.0.0, 1.0.1, 1.0.2
            Reporter: Matthias Klein


When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.

Apparent reason:

{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
            } else {
                kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1);    // Seek to last committed offset <== Does seek to end of partition
            }
        }
    }
{code}

The code seeks to the end of the partition instead of seeking to the first uncommited offset.

Preliminary fix (worked for me, but needs to be checked by an expert)

{code}
    private void doSeekRetriableTopicPartitions() {
        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();

        for (TopicPartition rtp : retriableTopicPartitions) {
            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
            if (offsetAndMeta != null) {
                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
            } else {
                OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
                if(committed == null) {
                    // No offsets commited yet for this partition - start from beginning 
                    kafkaConsumer.seekToBeginning(toArrayList(rtp));
                } else {
                   // Seek to first uncommitted offset
                    kafkaConsumer.seek(rtp, committed.offset() + 1);
                }
            }
        }
    }
{code}




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)