You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 11:42:58 UTC
[1/3] storm git commit: STORM-2413: Make new Kafka spout respect
tuple retry limit
Repository: storm
Updated Branches:
refs/heads/master ca12db92c -> 779e56656
STORM-2413: Make new Kafka spout respect tuple retry limit
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/20a82602
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/20a82602
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/20a82602
Branch: refs/heads/master
Commit: 20a82602ffda6795ced9d34e413464d7720f660c
Parents: ca12db9
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Stig Rohde Døssing <st...@gmail.com>
Committed: Sun May 21 13:15:47 2017 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 2 +-
.../storm/kafka/spout/KafkaSpoutMessageId.java | 4 +--
.../KafkaSpoutRetryExponentialBackoff.java | 14 +++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 13 +++++++-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 13 ++++++--
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 32 +++++++++++++++++---
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../src/test/resources/log4j2.xml | 32 ++++++++++++++++++++
8 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 72b8d14..09795ed 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -332,7 +332,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
*/
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
- final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dd6411a..1a60723 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
public class KafkaSpoutMessageId {
- private transient TopicPartition topicPart;
- private transient long offset;
+ private final transient TopicPartition topicPart;
+ private final transient long offset;
private transient int numFails = 0;
/**
* true if the record was emitted using a form of collector.emit(...). false
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 60c34dc..60a707d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -28,6 +28,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.utils.Time;
import org.slf4j.Logger;
@@ -287,6 +288,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
return count;
}
+ @Override
+ public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+ KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ if (toRetryMsgs.contains(msgId)) {
+ for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+ if (originalMsgId.equals(msgId)) {
+ return originalMsgId;
+ }
+ }
+ }
+ return msgId;
+ }
+
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
final long currentTimeNanos = Time.nanoTime();
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 04e4ae7..a1caf2c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -22,6 +22,7 @@ import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Represents the logic that manages the retrial of failed tuples.
@@ -75,6 +76,16 @@ public interface KafkaSpoutRetryService extends Serializable {
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
-
+
+ /**
+ * @return The number of messages that are ready for retry
+ */
int readyMessageCount();
+
+ /**
+ * Gets the {@link KafkaSpoutMessageId} for the given record.
+ * @param record The record to fetch the id for
+ * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+ */
+ KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index cdae4dd..2d55520 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -58,15 +58,17 @@ public class KafkaSpoutRebalanceTest {
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
private final long offsetCommitPeriodMs = 2_000;
- private final TopologyContext contextMock = mock(TopologyContext.class);
- private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
private KafkaConsumer<String, String> consumerMock;
private KafkaConsumerFactory<String, String> consumerFactory;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
consumerMock = mock(KafkaConsumer.class);
consumerFactory = (kafkaSpoutConfig) -> consumerMock;
}
@@ -159,9 +161,16 @@ public class KafkaSpoutRebalanceTest {
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
+ when(retryServiceMock.getMessageId(anyObject()))
+ .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+ .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+
//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ //Check that only two message ids were generated
+ verify(retryServiceMock, times(2)).getMessageId(anyObject());
+
//Fail both emitted tuples
spout.fail(emittedMessageIds.get(0));
spout.fail(emittedMessageIds.get(1));
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index ecbab87..7f0973b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,13 +17,13 @@
*/
package org.apache.storm.kafka.spout;
-
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -57,7 +57,6 @@ import org.mockito.ArgumentCaptor;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
-
public class SingleTopicKafkaSpoutTest {
@Rule
@@ -73,12 +72,15 @@ public class SingleTopicKafkaSpoutTest {
private KafkaConsumer<String, String> consumerSpy;
private KafkaConsumerFactory<String, String> consumerFactory;
private KafkaSpout<String, String> spout;
+ private int maxRetries = 3;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+ maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
.build();
this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
@@ -90,8 +92,8 @@ public class SingleTopicKafkaSpoutTest {
for (int i = 0; i < msgCount; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
- topicName, Integer.toString(i),
- Integer.toString(i));
+ topicName, Integer.toString(i),
+ Integer.toString(i));
kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
}
}
@@ -300,4 +302,26 @@ public class SingleTopicKafkaSpoutTest {
expectedReemitIds.add(capturedMessageIds.get(2));
assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
}
+
+ @Test
+ public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+ //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+ int messageCount = 1;
+ initializeSpout(messageCount);
+
+ //Emit and fail the same tuple until we've reached retry limit
+ for (int i = 0; i <= maxRetries; i++) {
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ spout.nextTuple();
+ verify(collector).emit(anyObject(), anyObject(), messageIdFailed.capture());
+ KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+ spout.fail(msgId);
+ assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+ reset(collector);
+ }
+
+ //Verify that the tuple is not emitted again
+ spout.nextTuple();
+ verify(collector, never()).emit(anyObject(), anyObject(), anyObject());
+ }
}
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index 2492e33..9459d4b 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -42,7 +42,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
private static final String[] TOPICS = new String[]{"test","test1","test2"};
-
public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainNamedTopics().runMain(args);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/20a82602/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="WARN">
+ <AppenderRef ref="Console"/>
+ </Root>
+ <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ </Loggers>
+</Configuration>
\ No newline at end of file
[2/3] storm git commit: Merge branch 'STORM-2413' of
https://github.com/srdo/storm into STORM-2413-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2413' of https://github.com/srdo/storm into STORM-2413-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/07ca978c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/07ca978c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/07ca978c
Branch: refs/heads/master
Commit: 07ca978c78846c9159d4e132d46c62465e82ec84
Parents: ca12db9 20a8260
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:18:45 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:18:45 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 2 +-
.../storm/kafka/spout/KafkaSpoutMessageId.java | 4 +--
.../KafkaSpoutRetryExponentialBackoff.java | 14 +++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 13 +++++++-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 13 ++++++--
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 32 +++++++++++++++++---
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../src/test/resources/log4j2.xml | 32 ++++++++++++++++++++
8 files changed, 100 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
[3/3] storm git commit: STORM-2413: CHANGELOG
Posted by ka...@apache.org.
STORM-2413: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/779e5665
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/779e5665
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/779e5665
Branch: refs/heads/master
Commit: 779e5665696f5f10cf9674283420278f38c7a05c
Parents: 07ca978
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:49 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:49 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/779e5665/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 57789ef..86d4db3 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -250,6 +250,7 @@
* STORM-1769: Added a test to check local nimbus with notifier plugin
## 1.1.1
+ * STORM-2413: Make new Kafka spout respect tuple retry limits
* STORM-2518: Handles empty name for "USER type" ACL when normalizing ACLs
* STORM-2501: Auto populate Hive Credentials
* STORM-2496: Dependency artifacts should be uploaded to blobstore with READ permission for all