You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/18 09:10:46 UTC
[1/3] storm git commit: [STORM-2544] Fixing issue in acking of tuples
that hit retry limit under manual commit mode
Repository: storm
Updated Branches:
refs/heads/master 110c40515 -> 4fce9380c
[STORM-2544] Fixing issue in acking of tuples that hit retry limit under manual commit mode
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d250ae0b
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d250ae0b
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d250ae0b
Branch: refs/heads/master
Commit: d250ae0b1b71af24f57950828111b39b4679e14e
Parents: 7f33447
Author: Prasanna Ranganathan <pr...@flipkart.com>
Authored: Wed Jun 7 18:37:24 2017 +0530
Committer: Prasanna Ranganathan <pr...@flipkart.com>
Committed: Mon Jul 17 17:55:45 2017 +0530
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 5 +-
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 126 +++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/d250ae0b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 09795ed..07acdd1 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -444,11 +444,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
+ " Partitions may have been reassigned. Ignoring message [{}]", msgId);
return;
}
- emitted.remove(msgId);
msgId.incrementNumFails();
if (!retryService.schedule(msgId)) {
LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
+ // this tuple should be removed from emitted only inside the ack() method. This is to ensure
+ // that the OffsetManager for that TopicPartition is updated and allows commit progression
ack(msgId);
+ } else {
+ emitted.remove(msgId);
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/d250ae0b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
new file mode 100644
index 0000000..d84f4da
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -0,0 +1,126 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
+
+public class KafkaSpoutRetryLimitTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpout<String, String> spout;
+ private KafkaSpoutConfig spoutConfig;
+
+ public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
+ new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+ 0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+ private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) {
+ spoutConfig = getKafkaSpoutConfigBuilder(-1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+ .build();
+
+ consumerMock = mock(KafkaConsumer.class);
+ KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
+
+ spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ }
+
+ @Test
+ public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
+ //Spout should ack failed messages after they hit the retry limit
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ setupSpoutWithNoRetry(Collections.singleton(partition));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ int lastOffset = 3;
+ for (int i = 0; i <= lastOffset; i++) {
+ recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(records));
+
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.fail(messageId);
+ }
+
+ // Advance time and then trigger call to kafka consumer commit
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //verify that Offset 3 was committed for the given TopicPartition
+ assertTrue(committedOffsets.getValue().containsKey(partition));
+ assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
+ }
+ }
+
+}
\ No newline at end of file
[2/3] storm git commit: Merge branch 'STORM-2544' of
https://github.com/askprasanna/storm into asfgit-master
Posted by sr...@apache.org.
Merge branch 'STORM-2544' of https://github.com/askprasanna/storm into asfgit-master
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/89ad4215
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/89ad4215
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/89ad4215
Branch: refs/heads/master
Commit: 89ad42153b93e0eccebd25cd462157975ea53f2a
Parents: 110c405 d250ae0
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 10:46:05 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 10:46:05 2017 +0200
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 5 +-
.../kafka/spout/KafkaSpoutRetryLimitTest.java | 126 +++++++++++++++++++
2 files changed, 130 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/89ad4215/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
[3/3] storm git commit: Changelog: STORM-2544
Posted by sr...@apache.org.
Changelog: STORM-2544
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/4fce9380
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/4fce9380
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/4fce9380
Branch: refs/heads/master
Commit: 4fce9380c237a0b320313328cbae550c524949b3
Parents: 89ad421
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Tue Jul 18 11:06:11 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 18 11:06:11 2017 +0200
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/4fce9380/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6eba831..d7e8242 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2544: Fixing issue in acking of tuples that hit retry limit under manual commit mode
* STORM-2622: Add owner resource summary on storm UI
* STORM-2634: Apply new code style to storm-sql-runtime
* STORM-2633: Apply new code style to storm-sql-redis