You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 11:42:58 UTC

[1/3] storm git commit: STORM-2413: Make new Kafka spout respect tuple retry limit

Repository: storm
Updated Branches:
  refs/heads/master ca12db92c -> 779e56656


STORM-2413: Make new Kafka spout respect tuple retry limit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20a82602
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20a82602
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20a82602

Branch: refs/heads/master
Commit: 20a82602ffda6795ced9d34e413464d7720f660c
Parents: ca12db9
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Stig Rohde Døssing <st...@gmail.com>
Committed: Sun May 21 13:15:47 2017 +0200

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  2 +-
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  4 +--
 .../KafkaSpoutRetryExponentialBackoff.java      | 14 +++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     | 13 +++++++-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 13 ++++++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 32 +++++++++++++++++---
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  1 -
 .../src/test/resources/log4j2.xml               | 32 ++++++++++++++++++++
 8 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 72b8d14..09795ed 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -332,7 +332,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
      */
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
         if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dd6411a..1a60723 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
 public class KafkaSpoutMessageId {
-    private transient TopicPartition topicPart;
-    private transient long offset;
+    private final transient TopicPartition topicPart;
+    private final transient long offset;
     private transient int numFails = 0;
     /**
      * true if the record was emitted using a form of collector.emit(...). false

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 60c34dc..60a707d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -28,6 +28,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.utils.Time;
 import org.slf4j.Logger;
@@ -287,6 +288,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         return count;
     }
 
+    @Override
+    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        if (toRetryMsgs.contains(msgId)) {
+            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+                if (originalMsgId.equals(msgId)) {
+                    return originalMsgId;
+                }
+            }
+        }
+        return msgId;
+    }
+
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
         final long currentTimeNanos = Time.nanoTime();

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 04e4ae7..a1caf2c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
@@ -75,6 +76,16 @@ public interface KafkaSpoutRetryService extends Serializable {
      *     Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
-    
+
+    /**
+     * @return The number of messages that are ready for retry
+     */
     int readyMessageCount();
+
+    /**
+     * Gets the {@link KafkaSpoutMessageId} for the given record.
+     * @param record The record to fetch the id for
+     * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+     */
+    KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index cdae4dd..2d55520 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -58,15 +58,17 @@ public class KafkaSpoutRebalanceTest {
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
     private final long offsetCommitPeriodMs = 2_000;
-    private final TopologyContext contextMock = mock(TopologyContext.class);
-    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
+    private TopologyContext contextMock;
+    private SpoutOutputCollector collectorMock;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactory;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
+        contextMock = mock(TopologyContext.class);
+        collectorMock = mock(SpoutOutputCollector.class);
         consumerMock = mock(KafkaConsumer.class);
         consumerFactory = (kafkaSpoutConfig) -> consumerMock;
     }
@@ -159,9 +161,16 @@ public class KafkaSpoutRebalanceTest {
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
+        when(retryServiceMock.getMessageId(anyObject()))
+            .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+            .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+        
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 
+        //Check that only two message ids were generated
+        verify(retryServiceMock, times(2)).getMessageId(anyObject());
+        
         //Fail both emitted tuples
         spout.fail(emittedMessageIds.get(0));
         spout.fail(emittedMessageIds.get(1));

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index ecbab87..7f0973b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,13 +17,13 @@
  */
 package org.apache.storm.kafka.spout;
 
-
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -57,7 +57,6 @@ import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
-
 public class SingleTopicKafkaSpoutTest {
 
     @Rule
@@ -73,12 +72,15 @@ public class SingleTopicKafkaSpoutTest {
     private KafkaConsumer<String, String> consumerSpy;
     private KafkaConsumerFactory<String, String> consumerFactory;
     private KafkaSpout<String, String> spout;
+    private int maxRetries = 3;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+                maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
         this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
@@ -90,8 +92,8 @@ public class SingleTopicKafkaSpoutTest {
 
         for (int i = 0; i < msgCount; i++) {
             ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                    topicName, Integer.toString(i),
-                    Integer.toString(i));
+                topicName, Integer.toString(i),
+                Integer.toString(i));
             kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
         }
     }
@@ -300,4 +302,26 @@ public class SingleTopicKafkaSpoutTest {
         expectedReemitIds.add(capturedMessageIds.get(2));
         assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
     }
+
+    @Test
+    public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+        //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+        int messageCount = 1;
+        initializeSpout(messageCount);
+
+        //Emit and fail the same tuple until we've reached retry limit
+        for (int i = 0; i <= maxRetries; i++) {
+            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            spout.nextTuple();
+            verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+            spout.fail(msgId);
+            assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+            reset(collector);
+        }
+
+        //Verify that the tuple is not emitted again
+        spout.nextTuple();
+        verify(collector, never()).emit(anyObject(), anyObject(), anyObject());
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 2492e33..9459d4b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -42,7 +42,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
     private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
     private static final String[] TOPICS = new String[]{"test","test1","test2"};
 
-
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainNamedTopics().runMain(args);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+    </Loggers>
+</Configuration>
\ No newline at end of file


[2/3] storm git commit: Merge branch 'STORM-2413' of https://github.com/srdo/storm into STORM-2413-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2413' of https://github.com/srdo/storm into STORM-2413-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07ca978c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07ca978c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07ca978c

Branch: refs/heads/master
Commit: 07ca978c78846c9159d4e132d46c62465e82ec84
Parents: ca12db9 20a8260
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:18:45 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:18:45 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  2 +-
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  4 +--
 .../KafkaSpoutRetryExponentialBackoff.java      | 14 +++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     | 13 +++++++-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 13 ++++++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 32 +++++++++++++++++---
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  1 -
 .../src/test/resources/log4j2.xml               | 32 ++++++++++++++++++++
 8 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------



[3/3] storm git commit: STORM-2413: CHANGELOG

Posted by ka...@apache.org.
STORM-2413: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/779e5665
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/779e5665
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/779e5665

Branch: refs/heads/master
Commit: 779e5665696f5f10cf9674283420278f38c7a05c
Parents: 07ca978
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:49 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:49 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/779e5665/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 57789ef..86d4db3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -250,6 +250,7 @@
  * STORM-1769: Added a test to check local nimbus with notifier plugin
 
 ## 1.1.1
+ * STORM-2413: Make new Kafka spout respect tuple retry limits
  * STORM-2518: Handles empty name for "USER type" ACL when normalizing ACLs
  * STORM-2501: Auto populate Hive Credentials
  * STORM-2496: Dependency artifacts should be uploaded to blobstore with READ permission for all