You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/12/11 18:39:00 UTC
[4/8] storm git commit: STORM-3301: Fix case where KafkaSpout could
emit tuples that were already committed
STORM-3301: Fix case where KafkaSpout could emit tuples that were already committed
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bb01856
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bb01856
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bb01856
Branch: refs/heads/master
Commit: 8bb0185651dfd0e085d9dda4bc0298a1bd8742fb
Parents: 94cd157
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Dec 8 22:15:04 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Dec 9 11:54:41 2018 +0100
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 35 ++++++------
.../kafka/spout/internal/OffsetManager.java | 1 +
.../KafkaSpoutLogCompactionSupportTest.java | 37 +++++++++++++
.../kafka/spout/KafkaSpoutSingleTopicTest.java | 56 ++++++++++++++++++++
4 files changed, 113 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5ebef80..27a531c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -215,13 +215,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
waitingToEmit.keySet().retainAll(partitions);
Set<TopicPartition> newPartitions = new HashSet<>(partitions);
+ // If this partition was previously assigned to this spout,
+ // leave the acked offsets and consumer position as they were to resume where it left off
newPartitions.removeAll(previousAssignment);
for (TopicPartition newTp : newPartitions) {
final OffsetAndMetadata committedOffset = consumer.committed(newTp);
final long fetchOffset = doSeek(newTp, committedOffset);
LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
- // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
}
@@ -526,18 +527,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
* The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
* than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
* part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
- * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
+ * to the committed offset.
*/
LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
position, committedOffset);
consumer.seek(tp, committedOffset);
- List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
- if (waitingToEmitForTp != null) {
- //Discard the pending records that are already committed
- waitingToEmit.put(tp, waitingToEmitForTp.stream()
- .filter(record -> record.offset() >= committedOffset)
- .collect(Collectors.toCollection(LinkedList::new)));
- }
+ }
+ /**
+ * In some cases the waitingToEmit list may contain tuples that have just been committed. Drop these.
+ */
+ List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
+ if (waitingToEmitForTp != null) {
+ //Discard the pending records that are already committed
+ waitingToEmit.put(tp, waitingToEmitForTp.stream()
+ .filter(record -> record.offset() >= committedOffset)
+ .collect(Collectors.toCollection(LinkedList::new)));
}
final OffsetManager offsetManager = offsetManagers.get(tp);
@@ -570,11 +574,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
if (!emitted.contains(msgId)) {
LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
- + "came from a topic-partition that this consumer group instance is no longer tracking "
- + "due to rebalance/partition reassignment. No action taken.", msgId);
+ + "came from a topic-partition that this consumer group instance is no longer tracking "
+ + "due to rebalance/partition reassignment. No action taken.", msgId);
} else {
Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
- + " This should never occur barring errors in the RetryService implementation or the spout code.");
+ + " This should never occur barring errors in the RetryService implementation or the spout code.");
offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
emitted.remove(msgId);
}
@@ -691,7 +695,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
configuration.put(configKeyPrefix + "topics", getTopicsString());
configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
- for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
+ for (Entry<String, Object> conf : kafkaSpoutConfig.getKafkaProps().entrySet()) {
if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
} else {
@@ -710,11 +714,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
private boolean isWrapper(Class<?> type) {
return type == Double.class || type == Float.class || type == Long.class
- || type == Integer.class || type == Short.class || type == Character.class
- || type == Byte.class || type == Boolean.class || type == String.class;
+ || type == Integer.class || type == Short.class || type == Character.class
+ || type == Byte.class || type == Boolean.class || type == String.class;
}
-
private String getTopicsString() {
return kafkaSpoutConfig.getTopicFilter().getTopicsString();
}
http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 5bec5b8..e9554d7 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -129,6 +129,7 @@ public class OffsetManager {
if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset",
currOffset, nextCommitOffset);
+ found = true;
nextCommitOffset = currOffset + 1;
} else {
LOG.debug("Topic-partition [{}] has non-sequential offset [{}]."
http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
index 11861e2..d6531ca 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -217,5 +217,42 @@ public class KafkaSpoutLogCompactionSupportTest {
assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
}
}
+
+ @Test
+ public void testCommitTupleAfterCompactionGap() {
+ //If there is an acked tupled after a compaction gap, the spout should commit it immediately
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+
+ List<KafkaSpoutMessageId> firstMessage = SpoutWithMockedConsumerSetupHelper
+ .pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 0);
+ reset(collectorMock);
+
+ List<KafkaSpoutMessageId> messageAfterGap = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+ reset(collectorMock);
+
+ spout.ack(firstMessage.get(0));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+ verify(consumerMock).commitSync(commitCapture.capture());
+ Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The consumer should have committed the offset before the gap",
+ committed.get(partition).offset(), is(1L));
+ reset(consumerMock);
+
+ spout.ack(messageAfterGap.get(0));
+
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ verify(consumerMock).commitSync(commitCapture.capture());
+ committed = commitCapture.getValue();
+ assertThat(committed.keySet(), is(Collections.singleton(partition)));
+ assertThat("The consumer should have committed the offset after the gap, since offset 1 wasn't emitted and both 0 and 2 are acked",
+ committed.get(partition).offset(), is(3L));
+ }
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 84def11..512d274 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -118,6 +118,62 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
}
verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
}
+
+ @Test
+ public void testClearingWaitingToEmitIfConsumerPositionIsNotBehindWhenCommitting() throws Exception {
+ final int messageCountExcludingLast = maxPollRecords;
+ int messagesInKafka = messageCountExcludingLast + 1;
+ prepareSpout(messagesInKafka);
+
+ //Emit all messages and fail the first one while acking the rest
+ for (int i = 0; i < messageCountExcludingLast; i++) {
+ spout.nextTuple();
+ }
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(messageCountExcludingLast)).emit(anyString(), anyList(), messageIdCaptor.capture());
+ List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
+ for (int i = 1; i < messageIds.size(); i++) {
+ spout.ack(messageIds.get(i));
+ }
+ KafkaSpoutMessageId failedTuple = messageIds.get(0);
+ spout.fail(failedTuple);
+
+ //Advance the time and replay the failed tuple.
+ //Since the last tuple on the partition is more than maxPollRecords ahead of the failed tuple, it shouldn't be emitted here
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
+
+ assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
+
+ /* Ack the tuple, and commit.
+ *
+ * The waiting to emit list should now be cleared, and the next emitted tuple should be the last tuple on the partition,
+ * which hasn't been emitted yet
+ */
+ reset(collectorMock);
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+ spout.ack(failedIdReplayCaptor.getValue());
+ spout.nextTuple();
+ verify(getKafkaConsumer()).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
+ TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+ assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
+ assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCountExcludingLast));
+
+ ArgumentCaptor<KafkaSpoutMessageId> lastOffsetMessageCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyString(), anyList(), lastOffsetMessageCaptor.capture());
+ assertThat("Expected emit of the final tuple in the partition", lastOffsetMessageCaptor.getValue().offset(), is(messagesInKafka - 1L));
+ reset(collectorMock);
+
+ //Nothing else should be emitted, all tuples are acked except for the final tuple, which is pending.
+ for(int i = 0; i < 3; i++) {
+ spout.nextTuple();
+ }
+ verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+ }
@Test
public void testShouldContinueWithSlowDoubleAcks() throws Exception {