You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/03 14:41:44 UTC
[26/40] storm git commit: STORM-2413: Make new Kafka spout respect
tuple retry limit
STORM-2413: Make new Kafka spout respect tuple retry limit
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fae07d96
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fae07d96
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fae07d96
Branch: refs/heads/1.1.x-branch
Commit: fae07d965753330996b7e7427b1cef32298e2086
Parents: bb908f6
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Jun 29 16:50:14 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 2 +-
.../storm/kafka/spout/KafkaSpoutMessageId.java | 4 +--
.../KafkaSpoutRetryExponentialBackoff.java | 14 +++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 14 +++++++--
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../src/test/resources/log4j2.xml | 32 ++++++++++++++++++++
6 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 32542b9..58347e3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -318,7 +318,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
*/
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
- final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..a9163eb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
public class KafkaSpoutMessageId {
- private transient TopicPartition topicPart;
- private transient long offset;
+ private final transient TopicPartition topicPart;
+ private final transient long offset;
private transient int numFails = 0;
private boolean emitted; // true if the record was emitted using a form of collector.emit(...).
// false when skipping null tuples as configured by the user in KafkaSpoutConfig
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2584685..6b53779 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.utils.Time;
/**
@@ -292,6 +293,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
return count;
}
+ @Override
+ public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+ KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ if (toRetryMsgs.contains(msgId)) {
+ for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+ if (originalMsgId.equals(msgId)) {
+ return originalMsgId;
+ }
+ }
+ }
+ return msgId;
+ }
+
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
final long currentTimeNanos = Time.nanoTime();
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..5147752 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Represents the logic that manages the retrial of failed tuples.
@@ -77,6 +77,16 @@ public interface KafkaSpoutRetryService extends Serializable {
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
-
+
+ /**
+ * @return The number of messages that are ready for retry
+ */
int readyMessageCount();
+
+ /**
+ * Gets the {@link KafkaSpoutMessageId} for the given record.
+ * @param record The record to fetch the id for
+ * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+ */
+ KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index d49516e..dc85123 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -46,7 +46,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
private static final String[] TOPICS = new String[]{"test","test1","test2"};
-
public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainNamedTopics().runMain(args);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/fae07d96/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="WARN">
+ <AppenderRef ref="Console"/>
+ </Root>
+ <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ </Loggers>
+</Configuration>
\ No newline at end of file