You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2018/12/11 18:39:00 UTC

[4/8] storm git commit: STORM-3301: Fix case where KafkaSpout could emit tuples that were already committed

STORM-3301: Fix case where KafkaSpout could emit tuples that were already committed


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8bb01856
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8bb01856
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8bb01856

Branch: refs/heads/master
Commit: 8bb0185651dfd0e085d9dda4bc0298a1bd8742fb
Parents: 94cd157
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Sat Dec 8 22:15:04 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sun Dec 9 11:54:41 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    | 35 ++++++------
 .../kafka/spout/internal/OffsetManager.java     |  1 +
 .../KafkaSpoutLogCompactionSupportTest.java     | 37 +++++++++++++
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  | 56 ++++++++++++++++++++
 4 files changed, 113 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5ebef80..27a531c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -215,13 +215,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             waitingToEmit.keySet().retainAll(partitions);
 
             Set<TopicPartition> newPartitions = new HashSet<>(partitions);
+            // If this partition was previously assigned to this spout,
+            // leave the acked offsets and consumer position as they were to resume where it left off
             newPartitions.removeAll(previousAssignment);
             for (TopicPartition newTp : newPartitions) {
                 final OffsetAndMetadata committedOffset = consumer.committed(newTp);
                 final long fetchOffset = doSeek(newTp, committedOffset);
                 LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
                     fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
-                // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
                 if (isAtLeastOnceProcessing() && !offsetManagers.containsKey(newTp)) {
                     offsetManagers.put(newTp, new OffsetManager(newTp, fetchOffset));
                 }
@@ -526,18 +527,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                      * The position is behind the committed offset. This can happen in some cases, e.g. if a message failed, lots of (more
                      * than max.poll.records) later messages were acked, and the failed message then gets acked. The consumer may only be
                      * part way through "catching up" to where it was when it went back to retry the failed tuple. Skip the consumer forward
-                     * to the committed offset and drop the current waiting to emit list, since it'll likely contain committed offsets.
+                     * to the committed offset.
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
                         position, committedOffset);
                     consumer.seek(tp, committedOffset);
-                    List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
-                    if (waitingToEmitForTp != null) {
-                        //Discard the pending records that are already committed
-                        waitingToEmit.put(tp, waitingToEmitForTp.stream()
-                            .filter(record -> record.offset() >= committedOffset)
-                            .collect(Collectors.toCollection(LinkedList::new)));
-                    }
+                }
+                /**
+                 * In some cases the waitingToEmit list may contain tuples that have just been committed. Drop these.
+                 */
+                List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
+                if (waitingToEmitForTp != null) {
+                    //Discard the pending records that are already committed
+                    waitingToEmit.put(tp, waitingToEmitForTp.stream()
+                        .filter(record -> record.offset() >= committedOffset)
+                        .collect(Collectors.toCollection(LinkedList::new)));
                 }
 
                 final OffsetManager offsetManager = offsetManagers.get(tp);
@@ -570,11 +574,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         if (!emitted.contains(msgId)) {
             LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
-                        + "came from a topic-partition that this consumer group instance is no longer tracking "
-                        + "due to rebalance/partition reassignment. No action taken.", msgId);
+                + "came from a topic-partition that this consumer group instance is no longer tracking "
+                + "due to rebalance/partition reassignment. No action taken.", msgId);
         } else {
             Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
-                        + " This should never occur barring errors in the RetryService implementation or the spout code.");
+                + " This should never occur barring errors in the RetryService implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
@@ -691,7 +695,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         configuration.put(configKeyPrefix + "topics", getTopicsString());
 
         configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
-        for (Entry<String, Object> conf: kafkaSpoutConfig.getKafkaProps().entrySet()) {
+        for (Entry<String, Object> conf : kafkaSpoutConfig.getKafkaProps().entrySet()) {
             if (conf.getValue() != null && isPrimitiveOrWrapper(conf.getValue().getClass())) {
                 configuration.put(configKeyPrefix + conf.getKey(), conf.getValue());
             } else {
@@ -710,11 +714,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private boolean isWrapper(Class<?> type) {
         return type == Double.class || type == Float.class || type == Long.class
-                || type == Integer.class || type == Short.class || type == Character.class
-                || type == Byte.class || type == Boolean.class || type == String.class;
+            || type == Integer.class || type == Short.class || type == Character.class
+            || type == Byte.class || type == Boolean.class || type == String.class;
     }
 
-
     private String getTopicsString() {
         return kafkaSpoutConfig.getTopicFilter().getTopicsString();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 5bec5b8..e9554d7 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -129,6 +129,7 @@ public class OffsetManager {
                     if (nextEmittedOffset != null && currOffset == nextEmittedOffset) {
                         LOG.debug("Found committable offset: [{}] after missing offset: [{}], skipping to the committable offset",
                             currOffset, nextCommitOffset);
+                        found = true;
                         nextCommitOffset = currOffset + 1;
                     } else {
                         LOG.debug("Topic-partition [{}] has non-sequential offset [{}]."

http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
index 11861e2..d6531ca 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -217,5 +217,42 @@ public class KafkaSpoutLogCompactionSupportTest {
             assertThat("The first partition should have committed all offsets", committed.get(partition).offset(), is(3L));
         }
     }
+    
+    @Test
+    public void testCommitTupleAfterCompactionGap() {
+        //If there is an acked tupled after a compaction gap, the spout should commit it immediately
+        try (SimulatedTime simulatedTime = new SimulatedTime()) {
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
+            
+            List<KafkaSpoutMessageId> firstMessage = SpoutWithMockedConsumerSetupHelper
+                .pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 0);
+            reset(collectorMock);
+            
+            List<KafkaSpoutMessageId> messageAfterGap = SpoutWithMockedConsumerSetupHelper.pollAndEmit(spout, consumerMock, 1, collectorMock, partition, 2);
+            reset(collectorMock);
+            
+            spout.ack(firstMessage.get(0));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            verify(consumerMock).commitSync(commitCapture.capture());
+            Map<TopicPartition, OffsetAndMetadata> committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The consumer should have committed the offset before the gap",
+                committed.get(partition).offset(), is(1L));
+            reset(consumerMock);
+            
+            spout.ack(messageAfterGap.get(0));
+            
+            Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+            spout.nextTuple();
+            
+            verify(consumerMock).commitSync(commitCapture.capture());
+            committed = commitCapture.getValue();
+            assertThat(committed.keySet(), is(Collections.singleton(partition)));
+            assertThat("The consumer should have committed the offset after the gap, since offset 1 wasn't emitted and both 0 and 2 are acked",
+                committed.get(partition).offset(), is(3L));
+        }
+    }
 
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/8bb01856/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
index 84def11..512d274 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutSingleTopicTest.java
@@ -118,6 +118,62 @@ public class KafkaSpoutSingleTopicTest extends KafkaSpoutAbstractTest {
         }
         verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
     }
+    
+    @Test
+    public void testClearingWaitingToEmitIfConsumerPositionIsNotBehindWhenCommitting() throws Exception {
+        final int messageCountExcludingLast = maxPollRecords;
+        int messagesInKafka = messageCountExcludingLast + 1;
+        prepareSpout(messagesInKafka);
+
+        //Emit all messages and fail the first one while acking the rest
+        for (int i = 0; i < messageCountExcludingLast; i++) {
+            spout.nextTuple();
+        }
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock, times(messageCountExcludingLast)).emit(anyString(), anyList(), messageIdCaptor.capture());
+        List<KafkaSpoutMessageId> messageIds = messageIdCaptor.getAllValues();
+        for (int i = 1; i < messageIds.size(); i++) {
+            spout.ack(messageIds.get(i));
+        }
+        KafkaSpoutMessageId failedTuple = messageIds.get(0);
+        spout.fail(failedTuple);
+
+        //Advance the time and replay the failed tuple. 
+        //Since the last tuple on the partition is more than maxPollRecords ahead of the failed tuple, it shouldn't be emitted here
+        reset(collectorMock);
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> failedIdReplayCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyString(), anyList(), failedIdReplayCaptor.capture());
+
+        assertThat("Expected replay of failed tuple", failedIdReplayCaptor.getValue(), is(failedTuple));
+
+        /* Ack the tuple, and commit.
+         * 
+         * The waiting to emit list should now be cleared, and the next emitted tuple should be the last tuple on the partition,
+         * which hasn't been emitted yet
+         */
+        reset(collectorMock);
+        Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + commitOffsetPeriodMs);
+        spout.ack(failedIdReplayCaptor.getValue());
+        spout.nextTuple();
+        verify(getKafkaConsumer()).commitSync(commitCapture.capture());
+
+        Map<TopicPartition, OffsetAndMetadata> capturedCommit = commitCapture.getValue();
+        TopicPartition expectedTp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        assertThat("Should have committed to the right topic", capturedCommit, Matchers.hasKey(expectedTp));
+        assertThat("Should have committed all the acked messages", capturedCommit.get(expectedTp).offset(), is((long)messageCountExcludingLast));
+        
+        ArgumentCaptor<KafkaSpoutMessageId> lastOffsetMessageCaptor = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyString(), anyList(), lastOffsetMessageCaptor.capture());
+        assertThat("Expected emit of the final tuple in the partition", lastOffsetMessageCaptor.getValue().offset(), is(messagesInKafka - 1L));
+        reset(collectorMock);
+
+        //Nothing else should be emitted, all tuples are acked except for the final tuple, which is pending.
+        for(int i = 0; i < 3; i++) {
+            spout.nextTuple();
+        }
+        verify(collectorMock, never()).emit(anyString(), anyList(), anyObject());
+    }
 
     @Test
     public void testShouldContinueWithSlowDoubleAcks() throws Exception {