You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 11:43:08 UTC

[1/4] storm git commit: STORM-2413: Make new Kafka spout respect tuple retry limit

Repository: storm
Updated Branches:
  refs/heads/1.x-branch 9a1ccd07c -> 7d904f0e6


STORM-2413: Make new Kafka spout respect tuple retry limit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b49a126
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b49a126
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b49a126

Branch: refs/heads/1.x-branch
Commit: 1b49a126327bca92fc8b7db441598d6b38676137
Parents: 9a1ccd0
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:41:27 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  2 +-
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  4 +--
 .../KafkaSpoutRetryExponentialBackoff.java      | 14 +++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     | 14 +++++++--
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  1 -
 .../src/test/resources/log4j2.xml               | 32 ++++++++++++++++++++
 6 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 32542b9..58347e3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -318,7 +318,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
      */
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
         if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail

http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..a9163eb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
 public class KafkaSpoutMessageId {
-    private transient TopicPartition topicPart;
-    private transient long offset;
+    private final transient TopicPartition topicPart;
+    private final transient long offset;
     private transient int numFails = 0;
     private boolean emitted;   // true if the record was emitted using a form of collector.emit(...).
                                // false when skipping null tuples as configured by the user in KafkaSpoutConfig

http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2584685..6b53779 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.utils.Time;
 
 /**
@@ -292,6 +293,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         return count;
     }
 
+    @Override
+    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        if (toRetryMsgs.contains(msgId)) {
+            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+                if (originalMsgId.equals(msgId)) {
+                    return originalMsgId;
+                }
+            }
+        }
+        return msgId;
+    }
+
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
         final long currentTimeNanos = Time.nanoTime();

http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..5147752 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
@@ -77,6 +77,16 @@ public interface KafkaSpoutRetryService extends Serializable {
      * Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
-    
+
+    /**
+     * @return The number of messages that are ready for retry
+     */
     int readyMessageCount();
+
+    /**
+     * Gets the {@link KafkaSpoutMessageId} for the given record.
+     * @param record The record to fetch the id for
+     * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+     */
+    KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index d49516e..dc85123 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -46,7 +46,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
     private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
     private static final String[] TOPICS = new String[]{"test","test1","test2"};
 
-
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainNamedTopics().runMain(args);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+    </Loggers>
+</Configuration>
\ No newline at end of file


[3/4] storm git commit: Merge branch 'STORM-2413-1.x-merge' into 1.x-branch

Posted by ka...@apache.org.
Merge branch 'STORM-2413-1.x-merge' into 1.x-branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b556860b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b556860b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b556860b

Branch: refs/heads/1.x-branch
Commit: b556860b27d7ce438caaa50557dc7675506c0eb5
Parents: 9a1ccd0 fa16019
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:09 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:09 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  2 +-
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  4 +--
 .../KafkaSpoutRetryExponentialBackoff.java      | 14 ++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     | 14 ++++++--
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 14 ++++++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 34 +++++++++++++++++---
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  1 -
 .../src/test/resources/log4j2.xml               | 32 ++++++++++++++++++
 8 files changed, 103 insertions(+), 12 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: STORM-2413: CHANGELOG

Posted by ka...@apache.org.
STORM-2413: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d904f0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d904f0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d904f0e

Branch: refs/heads/1.x-branch
Commit: 7d904f0e6babe5e9803937d13e9eca7d8e104431
Parents: b556860
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:24 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:24 2017 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/7d904f0e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f08441b..1e1cdaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.1
+ * STORM-2413: Make new Kafka spout respect tuple retry limits
  * STORM-2518 Handles empty name for "USER type" ACL when normalizing
  * STORM-2501: Auto populate Hive Credentials
  * STORM-2520: AutoHDFS should prefer cluster-wise hdfs kerberos principal


[2/4] storm git commit: STORM-2413 Make new Kafka spout respect tuple retry limits

Posted by ka...@apache.org.
STORM-2413 Make new Kafka spout respect tuple retry limits

* fix failing tests only for 1.x branch


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa160198
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa160198
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa160198

Branch: refs/heads/1.x-branch
Commit: fa16019862b87cbeefd6575b257e11a4944c804a
Parents: 1b49a12
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:41:35 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:41:35 2017 +0900

----------------------------------------------------------------------
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 14 ++++++--
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 34 +++++++++++++++++---
 2 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fa160198/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 81e3807..bd6e582 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -17,6 +17,7 @@ package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.any;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
 import static org.mockito.Matchers.anyCollection;
@@ -59,15 +60,17 @@ public class KafkaSpoutRebalanceTest {
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
     private final long offsetCommitPeriodMs = 2_000;
-    private final TopologyContext contextMock = mock(TopologyContext.class);
-    private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
+    private TopologyContext contextMock;
+    private SpoutOutputCollector collectorMock;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactoryMock;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
+        contextMock = mock(TopologyContext.class);
+        collectorMock = mock(SpoutOutputCollector.class);
         consumerMock = mock(KafkaConsumer.class);
         consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
             @Override
@@ -165,9 +168,16 @@ public class KafkaSpoutRebalanceTest {
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
+        when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
+            .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+            .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+        
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 
+        //Check that only two message ids were generated
+        verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
+        
         //Fail both emitted tuples
         spout.fail(emittedMessageIds.get(0));
         spout.fail(emittedMessageIds.get(1));

http://git-wip-us.apache.org/repos/asf/storm/blob/fa160198/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 884709d..6042c80 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,7 +17,6 @@
  */
 package org.apache.storm.kafka.spout;
 
-
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -37,11 +36,14 @@ import java.util.concurrent.TimeoutException;
 
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
@@ -61,7 +63,6 @@ import org.junit.Before;
 import org.mockito.Captor;
 import org.mockito.MockitoAnnotations;
 
-
 public class SingleTopicKafkaSpoutTest {
 
     @Rule
@@ -77,12 +78,15 @@ public class SingleTopicKafkaSpoutTest {
     private KafkaConsumer<String, String> consumerSpy;
     private KafkaConsumerFactory<String, String> consumerFactory;
     private KafkaSpout<String, String> spout;
+    private int maxRetries = 3;
 
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
         KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+            .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+                maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
         this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
         this.consumerFactory = new KafkaConsumerFactory<String, String>() {
@@ -100,8 +104,8 @@ public class SingleTopicKafkaSpoutTest {
 
         for (int i = 0; i < msgCount; i++) {
             ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
-                    topicName, Integer.toString(i),
-                    Integer.toString(i));
+                topicName, Integer.toString(i),
+                Integer.toString(i));
             kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
         }
     }
@@ -316,4 +320,26 @@ public class SingleTopicKafkaSpoutTest {
         expectedReemitIds.add(capturedMessageIds.get(2));
         assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
     }
+
+    @Test
+    public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+        //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+        int messageCount = 1;
+        initializeSpout(messageCount);
+
+        //Emit and fail the same tuple until we've reached retry limit
+        for (int i = 0; i <= maxRetries; i++) {
+            ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+            spout.nextTuple();
+            verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
+            KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+            spout.fail(msgId);
+            assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+            reset(collector);
+        }
+
+        //Verify that the tuple is not emitted again
+        spout.nextTuple();
+        verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject());
+    }
 }