You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "P. Taylor Goetz (JIRA)" <ji...@apache.org> on 2016/12/01 18:44:58 UTC
[jira] [Resolved] (STORM-2229) KafkaSpout does not resend failed
tuples
[ https://issues.apache.org/jira/browse/STORM-2229?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
P. Taylor Goetz resolved STORM-2229.
------------------------------------
Resolution: Duplicate
Resolving as duplicate. STORM-2077 updated to reflect proper component.
> KafkaSpout does not resend failed tuples
> ----------------------------------------
>
> Key: STORM-2229
> URL: https://issues.apache.org/jira/browse/STORM-2229
> Project: Apache Storm
> Issue Type: Bug
> Components: storm-kafka-client
> Affects Versions: 1.0.0, 1.0.1, 1.0.2
> Reporter: Matthias Klein
>
> When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.
> Apparent reason:
> {code}
> public class KafkaSpout<K, V> extends BaseRichSpout {
> //...
> private void doSeekRetriableTopicPartitions() {
> final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
> for (TopicPartition rtp : retriableTopicPartitions) {
> final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
> if (offsetAndMeta != null) {
> kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
> } else {
> kafkaConsumer.seekToEnd(toArrayList(rtp)); // Seek to last committed offset <== Does seek to end of partition
> }
> }
> }
> {code}
> The code seeks to the end of the partition instead of seeking to the first uncommited offset.
> Preliminary fix (worked for me, but needs to be checked by an expert)
> {code}
> private void doSeekRetriableTopicPartitions() {
> final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
> for (TopicPartition rtp : retriableTopicPartitions) {
> final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
> if (offsetAndMeta != null) {
> kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
> } else {
> OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
> if(committed == null) {
> // No offsets commited yet for this partition - start from beginning
> kafkaConsumer.seekToBeginning(toArrayList(rtp));
> } else {
> // Seek to first uncommitted offset
> kafkaConsumer.seek(rtp, committed.offset() + 1);
> }
> }
> }
> }
> {code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)