You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@storm.apache.org by "Matthias Klein (JIRA)" <ji...@apache.org> on 2016/12/01 07:09:58 UTC
[jira] [Created] (STORM-2229) KafkaSpout does not resend failed
tuples
Matthias Klein created STORM-2229:
-------------------------------------
Summary: KafkaSpout does not resend failed tuples
Key: STORM-2229
URL: https://issues.apache.org/jira/browse/STORM-2229
Project: Apache Storm
Issue Type: Bug
Components: storm-kafka-client
Affects Versions: 1.0.0, 1.0.1, 1.0.2
Reporter: Matthias Klein
When the topology fails a tuple, it is never resent by the KafkaSpout. This can easily be shown by constructing a small topology failing every tuple.
Apparent reason:
{code}
public class KafkaSpout<K, V> extends BaseRichSpout {
//...
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
kafkaConsumer.seek(rtp, acked.get(rtp).committedOffset + 1); // Seek to last committed offset <== Does seek to end of partition
}
}
}
{code}
The code seeks to the end of the partition instead of seeking to the first uncommited offset.
Preliminary fix (worked for me, but needs to be checked by an expert)
{code}
private void doSeekRetriableTopicPartitions() {
final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
for (TopicPartition rtp : retriableTopicPartitions) {
final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
if (offsetAndMeta != null) {
kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1); // seek to the next offset that is ready to commit in next commit cycle
} else {
OffsetAndMetadata committed = kafkaConsumer.committed(rtp);
if(committed == null) {
// No offsets commited yet for this partition - start from beginning
kafkaConsumer.seekToBeginning(toArrayList(rtp));
} else {
// Seek to first uncommitted offset
kafkaConsumer.seek(rtp, committed.offset() + 1);
}
}
}
}
{code}
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)