You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:44 UTC

[26/40] storm git commit: STORM-2413: Make new Kafka spout respect tuple retry limit

STORM-2413: Make new Kafka spout respect tuple retry limit


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fae07d96
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fae07d96
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fae07d96

Branch: refs/heads/1.1.x-branch
Commit: fae07d965753330996b7e7427b1cef32298e2086
Parents: bb908f6
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:50:14 2017 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  2 +-
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  4 +--
 .../KafkaSpoutRetryExponentialBackoff.java      | 14 +++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     | 14 +++++++--
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  1 -
 .../src/test/resources/log4j2.xml               | 32 ++++++++++++++++++++
 6 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 32542b9..58347e3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -318,7 +318,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
      */
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
         final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
-        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
         if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) {   // has been acked
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail

http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..a9163eb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
 public class KafkaSpoutMessageId {
-    private transient TopicPartition topicPart;
-    private transient long offset;
+    private final transient TopicPartition topicPart;
+    private final transient long offset;
     private transient int numFails = 0;
     private boolean emitted;   // true if the record was emitted using a form of collector.emit(...).
                                // false when skipping null tuples as configured by the user in KafkaSpoutConfig

http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2584685..6b53779 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -32,6 +32,7 @@ import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.utils.Time;
 
 /**
@@ -292,6 +293,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         return count;
     }
 
+    @Override
+    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        if (toRetryMsgs.contains(msgId)) {
+            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+                if (originalMsgId.equals(msgId)) {
+                    return originalMsgId;
+                }
+            }
+        }
+        return msgId;
+    }
+
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
         final long currentTimeNanos = Time.nanoTime();

http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..5147752 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
@@ -77,6 +77,16 @@ public interface KafkaSpoutRetryService extends Serializable {
      * Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
-    
+
+    /**
+     * @return The number of messages that are ready for retry
+     */
     int readyMessageCount();
+
+    /**
+     * Gets the {@link KafkaSpoutMessageId} for the given record.
+     * @param record The record to fetch the id for
+     * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+     */
+    KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index d49516e..dc85123 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -46,7 +46,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
     private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
     private static final String[] TOPICS = new String[]{"test","test1","test2"};
 
-
     public static void main(String[] args) throws Exception {
         new KafkaSpoutTopologyMainNamedTopics().runMain(args);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements.  See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License.  You may obtain a copy of the License at
+
+     http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+    <Appenders>
+        <Console name="Console" target="SYSTEM_OUT">
+            <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+        </Console>
+    </Appenders>
+    <Loggers>
+        <Root level="WARN">
+            <AppenderRef ref="Console"/>
+        </Root>
+        <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+            <AppenderRef ref="Console"/>
+        </Logger>
+    </Loggers>
+</Configuration>
\ No newline at end of file