You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/05/21 11:43:08 UTC
[1/4] storm git commit: STORM-2413: Make new Kafka spout respect
tuple retry limit
Repository: storm
Updated Branches:
refs/heads/1.x-branch 9a1ccd07c -> 7d904f0e6
STORM-2413: Make new Kafka spout respect tuple retry limit
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1b49a126
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1b49a126
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1b49a126
Branch: refs/heads/1.x-branch
Commit: 1b49a126327bca92fc8b7db441598d6b38676137
Parents: 9a1ccd0
Author: Stig Rohde Døssing <sd...@it-minds.dk>
Authored: Tue Mar 14 22:12:50 2017 +0100
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:41:27 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 2 +-
.../storm/kafka/spout/KafkaSpoutMessageId.java | 4 +--
.../KafkaSpoutRetryExponentialBackoff.java | 14 +++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 14 +++++++--
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../src/test/resources/log4j2.xml | 32 ++++++++++++++++++++
6 files changed, 61 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 32542b9..58347e3 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -318,7 +318,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
*/
private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
- final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ final KafkaSpoutMessageId msgId = retryService.getMessageId(record);
if (offsetManagers.containsKey(tp) && offsetManagers.get(tp).contains(msgId)) { // has been acked
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index dea57c4..a9163eb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -22,8 +22,8 @@ import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
public class KafkaSpoutMessageId {
- private transient TopicPartition topicPart;
- private transient long offset;
+ private final transient TopicPartition topicPart;
+ private final transient long offset;
private transient int numFails = 0;
private boolean emitted; // true if the record was emitted using a form of collector.emit(...).
// false when skipping null tuples as configured by the user in KafkaSpoutConfig
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index 2584685..6b53779 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.TimeUnit;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.storm.utils.Time;
/**
@@ -292,6 +293,19 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
return count;
}
+ @Override
+ public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+ KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+ if (toRetryMsgs.contains(msgId)) {
+ for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+ if (originalMsgId.equals(msgId)) {
+ return originalMsgId;
+ }
+ }
+ }
+ return msgId;
+ }
+
// if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
private long nextTime(KafkaSpoutMessageId msgId) {
final long currentTimeNanos = Time.nanoTime();
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index f0230c3..5147752 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -23,7 +23,7 @@ import org.apache.kafka.common.TopicPartition;
import java.io.Serializable;
import java.util.Collection;
import java.util.Map;
-import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
/**
* Represents the logic that manages the retrial of failed tuples.
@@ -77,6 +77,16 @@ public interface KafkaSpoutRetryService extends Serializable {
* Returns false is this message is not scheduled for retrial
*/
boolean isScheduled(KafkaSpoutMessageId msgId);
-
+
+ /**
+ * @return The number of messages that are ready for retry
+ */
int readyMessageCount();
+
+ /**
+ * Gets the {@link KafkaSpoutMessageId} for the given record.
+ * @param record The record to fetch the id for
+ * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for retry.
+ */
+ KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
index d49516e..dc85123 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMainNamedTopics.java
@@ -46,7 +46,6 @@ public class KafkaSpoutTopologyMainNamedTopics {
private static final String TOPIC_0_1_STREAM = "test_0_1_stream";
private static final String[] TOPICS = new String[]{"test","test1","test2"};
-
public static void main(String[] args) throws Exception {
new KafkaSpoutTopologyMainNamedTopics().runMain(args);
}
http://git-wip-us.apache.org/repos/asf/storm/blob/1b49a126/external/storm-kafka-client/src/test/resources/log4j2.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/resources/log4j2.xml b/external/storm-kafka-client/src/test/resources/log4j2.xml
new file mode 100755
index 0000000..393dd2c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/resources/log4j2.xml
@@ -0,0 +1,32 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<Configuration status="WARN">
+ <Appenders>
+ <Console name="Console" target="SYSTEM_OUT">
+ <PatternLayout pattern="%d{HH:mm:ss.SSS} [%t] %-5level %logger{36} - %msg%n" charset="UTF-8"/>
+ </Console>
+ </Appenders>
+ <Loggers>
+ <Root level="WARN">
+ <AppenderRef ref="Console"/>
+ </Root>
+ <Logger name="org.apache.storm.kafka" level="INFO" additivity="false">
+ <AppenderRef ref="Console"/>
+ </Logger>
+ </Loggers>
+</Configuration>
\ No newline at end of file
[3/4] storm git commit: Merge branch 'STORM-2413-1.x-merge' into
1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2413-1.x-merge' into 1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/b556860b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/b556860b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/b556860b
Branch: refs/heads/1.x-branch
Commit: b556860b27d7ce438caaa50557dc7675506c0eb5
Parents: 9a1ccd0 fa16019
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:09 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:09 2017 +0900
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 2 +-
.../storm/kafka/spout/KafkaSpoutMessageId.java | 4 +--
.../KafkaSpoutRetryExponentialBackoff.java | 14 ++++++++
.../kafka/spout/KafkaSpoutRetryService.java | 14 ++++++--
.../kafka/spout/KafkaSpoutRebalanceTest.java | 14 ++++++--
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 34 +++++++++++++++++---
.../test/KafkaSpoutTopologyMainNamedTopics.java | 1 -
.../src/test/resources/log4j2.xml | 32 ++++++++++++++++++
8 files changed, 103 insertions(+), 12 deletions(-)
----------------------------------------------------------------------
[4/4] storm git commit: STORM-2413: CHANGELOG
Posted by ka...@apache.org.
STORM-2413: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/7d904f0e
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/7d904f0e
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/7d904f0e
Branch: refs/heads/1.x-branch
Commit: 7d904f0e6babe5e9803937d13e9eca7d8e104431
Parents: b556860
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:42:24 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:42:24 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/7d904f0e/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index f08441b..1e1cdaa 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.1
+ * STORM-2413: Make new Kafka spout respect tuple retry limits
* STORM-2518 Handles empty name for "USER type" ACL when normalizing
* STORM-2501: Auto populate Hive Credentials
* STORM-2520: AutoHDFS should prefer cluster-wise hdfs kerberos principal
[2/4] storm git commit: STORM-2413 Make new Kafka spout respect tuple
retry limits
Posted by ka...@apache.org.
STORM-2413 Make new Kafka spout respect tuple retry limits
* fix failing tests only for 1.x branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fa160198
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fa160198
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fa160198
Branch: refs/heads/1.x-branch
Commit: fa16019862b87cbeefd6575b257e11a4944c804a
Parents: 1b49a12
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Sun May 21 20:41:35 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Sun May 21 20:41:35 2017 +0900
----------------------------------------------------------------------
.../kafka/spout/KafkaSpoutRebalanceTest.java | 14 ++++++--
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 34 +++++++++++++++++---
2 files changed, 42 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/fa160198/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 81e3807..bd6e582 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -17,6 +17,7 @@ package org.apache.storm.kafka.spout;
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.any;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
import static org.mockito.Matchers.anyCollection;
@@ -59,15 +60,17 @@ public class KafkaSpoutRebalanceTest {
private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
private final long offsetCommitPeriodMs = 2_000;
- private final TopologyContext contextMock = mock(TopologyContext.class);
- private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
private final Map<String, Object> conf = new HashMap<>();
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
private KafkaConsumer<String, String> consumerMock;
private KafkaConsumerFactory<String, String> consumerFactoryMock;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
consumerMock = mock(KafkaConsumer.class);
consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
@Override
@@ -165,9 +168,16 @@ public class KafkaSpoutRebalanceTest {
TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
TopicPartition assignedPartition = new TopicPartition(topic, 2);
+ when(retryServiceMock.getMessageId(Mockito.any(ConsumerRecord.class)))
+ .thenReturn(new KafkaSpoutMessageId(partitionThatWillBeRevoked, 0))
+ .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
+
//Emit a message on each partition and revoke the first partition
List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+ //Check that only two message ids were generated
+ verify(retryServiceMock, times(2)).getMessageId(Mockito.any(ConsumerRecord.class));
+
//Fail both emitted tuples
spout.fail(emittedMessageIds.get(0));
spout.fail(emittedMessageIds.get(1));
http://git-wip-us.apache.org/repos/asf/storm/blob/fa160198/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 884709d..6042c80 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -17,7 +17,6 @@
*/
package org.apache.storm.kafka.spout;
-
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
@@ -37,11 +36,14 @@ import java.util.concurrent.TimeoutException;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyListOf;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
@@ -61,7 +63,6 @@ import org.junit.Before;
import org.mockito.Captor;
import org.mockito.MockitoAnnotations;
-
public class SingleTopicKafkaSpoutTest {
@Rule
@@ -77,12 +78,15 @@ public class SingleTopicKafkaSpoutTest {
private KafkaConsumer<String, String> consumerSpy;
private KafkaConsumerFactory<String, String> consumerFactory;
private KafkaSpout<String, String> spout;
+ private int maxRetries = 3;
@Before
public void setUp() {
MockitoAnnotations.initMocks(this);
KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
.setOffsetCommitPeriodMs(commitOffsetPeriodMs)
+ .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
+ maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
.build();
this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
this.consumerFactory = new KafkaConsumerFactory<String, String>() {
@@ -100,8 +104,8 @@ public class SingleTopicKafkaSpoutTest {
for (int i = 0; i < msgCount; i++) {
ProducerRecord<String, String> producerRecord = new ProducerRecord<>(
- topicName, Integer.toString(i),
- Integer.toString(i));
+ topicName, Integer.toString(i),
+ Integer.toString(i));
kafkaUnitRule.getKafkaUnit().sendMessage(producerRecord);
}
}
@@ -316,4 +320,26 @@ public class SingleTopicKafkaSpoutTest {
expectedReemitIds.add(capturedMessageIds.get(2));
assertThat("Expected reemits to be the 3 failed tuples", new HashSet<>(reemittedMessageIds.getAllValues()), is(expectedReemitIds));
}
+
+ @Test
+ public void shouldDropMessagesAfterMaxRetriesAreReached() throws Exception {
+ //Check that if one message fails repeatedly, the retry cap limits how many times the message can be reemitted
+ int messageCount = 1;
+ initializeSpout(messageCount);
+
+ //Emit and fail the same tuple until we've reached retry limit
+ for (int i = 0; i <= maxRetries; i++) {
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdFailed = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ spout.nextTuple();
+ verify(collector).emit(anyString(), anyListOf(Object.class), messageIdFailed.capture());
+ KafkaSpoutMessageId msgId = messageIdFailed.getValue();
+ spout.fail(msgId);
+ assertThat("Expected message id number of failures to match the number of times the message has failed", msgId.numFails(), is(i + 1));
+ reset(collector);
+ }
+
+ //Verify that the tuple is not emitted again
+ spout.nextTuple();
+ verify(collector, never()).emit(anyString(), anyListOf(Object.class), anyObject());
+ }
}