You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2017/07/19 05:17:10 UTC
[1/3] storm git commit: STORM-2639 Kafka Spout incorrectly computes
numCommittedOffsets due to voids in the topic (topic compaction)
Repository: storm
Updated Branches:
refs/heads/1.1.x-branch 1c3c1ec55 -> ef1222e9b
STORM-2639 Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/04fe95ee
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/04fe95ee
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/04fe95ee
Branch: refs/heads/1.1.x-branch
Commit: 04fe95ee459e3d0524f3b0f3582d3d52bc988df1
Parents: 1c3c1ec
Author: Prasanna Ranganathan <pr...@flipkart.com>
Authored: Wed Jun 7 18:03:09 2017 +0530
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Jul 19 14:10:18 2017 +0900
----------------------------------------------------------------------
.../kafka/spout/internal/OffsetManager.java | 9 +-
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 152 +++++++++++++++++++
2 files changed, 157 insertions(+), 4 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/04fe95ee/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index 0bf4132..5139072 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -130,12 +130,13 @@ public class OffsetManager {
* @return Number of offsets committed in this commit
*/
public long commit(OffsetAndMetadata committedOffset) {
- long preCommitCommittedOffsets = this.committedOffset;
- long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+ final long preCommitCommittedOffsets = this.committedOffset;
+ long numCommittedOffsets = 0;
this.committedOffset = committedOffset.offset();
for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext();) {
if (iterator.next().offset() <= committedOffset.offset()) {
iterator.remove();
+ numCommittedOffsets++;
} else {
break;
}
@@ -151,8 +152,8 @@ public class OffsetManager {
LOG.trace("{}", this);
- LOG.debug("Committed offsets [{}-{} = {}] for topic-partition [{}].",
- preCommitCommittedOffsets + 1, this.committedOffset, numCommittedOffsets, tp);
+ LOG.debug("Committed [{}] offsets in the range [{}-{}] for topic-partition [{}].",
+ numCommittedOffsets, preCommitCommittedOffsets + 1, this.committedOffset, tp);
return numCommittedOffsets;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/04fe95ee/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
new file mode 100644
index 0000000..b7737c7
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -0,0 +1,152 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.*;
+import static org.mockito.Mockito.*;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
+
+public class KafkaSpoutCommitTest {
+
+ private final long offsetCommitPeriodMs = 2_000;
+ private final TopologyContext contextMock = mock(TopologyContext.class);
+ private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
+ private final Map<String, Object> conf = new HashMap<>();
+ private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaSpout<String, String> spout;
+ private KafkaSpoutConfig spoutConfig;
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private void setupSpout(Set<TopicPartition> assignedPartitions) {
+ MockitoAnnotations.initMocks(this);
+ spoutConfig = getKafkaSpoutConfigBuilder(-1)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .build();
+
+ consumerMock = mock(KafkaConsumer.class);
+ KafkaConsumerFactory<String, String> consumerFactory = new KafkaConsumerFactory<String, String>() {
+ @Override
+ public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+ return consumerMock;
+ }
+ };
+
+ //Set up a spout listening to 1 topic partition
+ spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+ }
+
+ @Test
+ public void testCommitSuccessWithOffsetVoids() {
+ //Verify that the commit logic can handle offset voids
+ try (SimulatedTime simulatedTime = new SimulatedTime()) {
+ setupSpout(Collections.singleton(partition));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
+ List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
+ // Offsets emitted are 0,1,2,3,4,<void>,8,9
+ for (int i = 0; i < 5; i++) {
+ recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ for (int i = 8; i < 10; i++) {
+ recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+ }
+ records.put(partition, recordsForPartition);
+
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(records));
+
+ for (int i = 0; i < recordsForPartition.size(); i++) {
+ spout.nextTuple();
+ }
+
+ ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock, times(recordsForPartition.size())).emit(anyString(), anyList(), messageIds.capture());
+
+ for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
+ spout.ack(messageId);
+ }
+
+ // Advance time and then trigger first call to kafka consumer commit; the commit will progress till offset 4
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> emptyConsumerRecords = Collections.emptyMap();
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<>(emptyConsumerRecords));
+ spout.nextTuple();
+
+ InOrder inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //verify that Offset 4 was last committed offset
+ //the offset void should be bridged in the next commit
+ Map<TopicPartition, OffsetAndMetadata> commits = commitCapture.getValue();
+ assertTrue(commits.containsKey(partition));
+ assertEquals(4, commits.get(partition).offset());
+
+ //Trigger second kafka consumer commit
+ reset(consumerMock);
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords<String, String>(emptyConsumerRecords));
+ Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
+ spout.nextTuple();
+
+ inOrder = inOrder(consumerMock);
+ inOrder.verify(consumerMock).commitSync(commitCapture.capture());
+ inOrder.verify(consumerMock).poll(anyLong());
+
+ //verify that Offset 9 was last committed offset
+ commits = commitCapture.getValue();
+ assertTrue(commits.containsKey(partition));
+ assertEquals(9, commits.get(partition).offset());
+ }
+ }
+
+}
\ No newline at end of file
[3/3] storm git commit: STORM-2639: CHANGELOG
Posted by ka...@apache.org.
STORM-2639: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ef1222e9
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ef1222e9
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ef1222e9
Branch: refs/heads/1.1.x-branch
Commit: ef1222e9b0e34ea44b616beaca9f7dfbc7ead0d9
Parents: 6640174
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Jul 19 14:16:57 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Jul 19 14:16:57 2017 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/ef1222e9/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 63d58aa..eff5516 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.1
+ * STORM-2639: Kafka Spout incorrectly computes numCommittedOffsets due to voids in the topic (topic compaction)
* STORM-2544: Fixing issue in acking of tuples that hit retry limit under manual commit mode
* STORM-2618: Add TridentKafkaStateUpdater for storm-kafka-client
* STORM-2608: Remove any pending offsets that are no longer valid
[2/3] storm git commit: Merge branch 'STORM-2639-1.1.x' into
1.1.x-branch
Posted by ka...@apache.org.
Merge branch 'STORM-2639-1.1.x' into 1.1.x-branch
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/66401741
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/66401741
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/66401741
Branch: refs/heads/1.1.x-branch
Commit: 6640174174699b3823545c01be97be1ff2d02398
Parents: 1c3c1ec 04fe95e
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Wed Jul 19 14:16:26 2017 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Wed Jul 19 14:16:26 2017 +0900
----------------------------------------------------------------------
.../kafka/spout/internal/OffsetManager.java | 9 +-
.../storm/kafka/spout/KafkaSpoutCommitTest.java | 152 +++++++++++++++++++
2 files changed, 157 insertions(+), 4 deletions(-)
----------------------------------------------------------------------