You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/29 00:29:47 UTC
[1/4] storm git commit: STORM-2104: More graceful handling of
acked/failed tuples after partition reassignment in new Kafka spout
Repository: storm
Updated Branches:
refs/heads/1.x-branch bbc8b641b -> 8ab0285d4
STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a92b7d0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a92b7d0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a92b7d0a
Branch: refs/heads/1.x-branch
Commit: a92b7d0ab2bc7c5bcd0b822785a50f59099ad715
Parents: 4154490
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Sep 20 13:53:55 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Dec 20 19:05:49 2016 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 10 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 48 ++++--
.../storm/kafka/spout/KafkaSpoutConfig.java | 8 +-
.../kafka/spout/SerializableDeserializer.java | 25 +++
.../spout/internal/KafkaConsumerFactory.java | 27 +++
.../internal/KafkaConsumerFactoryDefault.java | 29 ++++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 166 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 35 ++--
.../builders/TopicKeyValueTupleBuilder.java | 1 +
9 files changed, 318 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 2d113fa..0878fdf 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -70,13 +70,19 @@
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
<scope>test</scope>
</dependency>
<dependency>
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 38095fe..245ea1a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,6 +52,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
import org.apache.kafka.common.errors.InterruptException;
@@ -64,6 +66,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;
@@ -86,8 +89,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+ }
+
+ //This constructor is here for testing
+ KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
}
@Override
@@ -147,6 +156,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
retryService.retainAll(partitions);
+
+ //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
+ Set<TopicPartition> partitionsSet = new HashSet(partitions);
+ emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -301,16 +314,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
- } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
- final List<Object> tuple = tuplesBuilder.buildTuple(record);
- kafkaSpoutStreams.emit(collector, tuple, msgId);
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
- retryService.remove(msgId); // re-emitted hence remove from failed
+ } else {
+ boolean isScheduled = retryService.isScheduled(msgId);
+ if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+ return true;
}
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
- return true;
}
return false;
}
@@ -345,6 +361,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
+
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
@@ -356,8 +377,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- emitted.remove(msgId);
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
if (msgId.numFails() < maxRetries) {
+ emitted.remove(msgId);
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
@@ -378,8 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
private void subscribeKafkaConsumer() {
- kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
- kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..8aa525b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -105,8 +105,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static class Builder<K,V> {
private final Map<String, Object> kafkaProps;
- private Deserializer<K> keyDeserializer;
- private Deserializer<V> valueDeserializer;
+ private SerializableDeserializer<K> keyDeserializer;
+ private SerializableDeserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -164,7 +164,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Specifying this key deserializer overrides the property key.deserializer
*/
- public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+ public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
return this;
}
@@ -172,7 +172,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Specifying this value deserializer overrides the property value.deserializer
*/
- public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+ public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 0000000..eb76a90
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param <T> The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
new file mode 100644
index 0000000..0b253b4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+ public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
new file mode 100644
index 0000000..7900388
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
+
+ @Override
+ public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
new file mode 100644
index 0000000..3e077ab
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
+ private Map conf;
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
+ conf = new HashMap<>();
+ consumerMock = mock(KafkaConsumer.class);
+ consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ }
+
+ //Returns messageIds in order of emission
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(partitionThatWillBeRevoked);
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ //Make the consumer return a single message for each partition
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+ //Emit the messages
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+ //Now rebalance
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+
+ List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+ emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+ emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+ return emittedMessageIds;
+ }
+
+ @Test
+ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Thread.sleep(510);
+
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
+
+ @Test
+ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Fail both emitted tuples
+ spout.fail(emittedMessageIds.get(0));
+ spout.fail(emittedMessageIds.get(1));
+
+ //Check that only the tuple on the currently assigned partition is retried
+ verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+ verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index baece93..95b2199 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -30,6 +30,7 @@ import java.util.Map;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
public class SingleTopicKafkaSpoutConfiguration {
+
public static final String STREAM = "test_stream";
public static final String TOPIC = "test";
@@ -46,22 +47,30 @@ public class SingleTopicKafkaSpoutConfiguration {
return tp.createTopology();
}
- public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
- .setOffsetCommitPeriodMs(10_000)
- .setFirstPollOffsetStrategy(EARLIEST)
- .setMaxUncommittedOffsets(250)
- .setPollTimeoutMs(1000)
- .build();
+ public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ }
+
+ public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ }
+
+ public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000)
+ .build();
}
protected static KafkaSpoutRetryService getRetryService() {
return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
- KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+ KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
}
- protected static Map<String,Object> getKafkaConsumerProps(int port) {
+ protected static Map<String, Object> getKafkaConsumerProps(int port) {
Map<String, Object> props = new HashMap<>();
props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
@@ -73,13 +82,13 @@ public class SingleTopicKafkaSpoutConfiguration {
protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
- new TopicKeyValueTupleBuilder<String, String>(TOPIC))
- .build();
+ new TopicKeyValueTupleBuilder<String, String>(TOPIC))
+ .build();
}
public static KafkaSpoutStreams getKafkaSpoutStreams() {
final Fields outputFields = new Fields("topic", "key", "value");
- return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
- .build();
+ return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
+ .build();
}
}
http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
index 4f20b58..492b79d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -1,3 +1,4 @@
+
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
[2/4] storm git commit: STORM-2104: Rewrite a few parts to be
compatible with Java 7
Posted by ka...@apache.org.
STORM-2104: Rewrite a few parts to be compatible with Java 7
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e225b48
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e225b48
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e225b48
Branch: refs/heads/1.x-branch
Commit: 3e225b480cb83e60309893a10b52cd0dfaa7f673
Parents: a92b7d0
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Thu Dec 8 19:34:07 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Dec 20 19:10:45 2016 +0100
----------------------------------------------------------------------
.../apache/storm/kafka/spout/KafkaSpout.java | 8 ++++++-
.../kafka/spout/KafkaSpoutRebalanceTest.java | 24 ++++++++++++++++----
.../builders/TopicKeyValueTupleBuilder.java | 1 -
3 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 245ea1a..737c810 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -159,7 +159,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
//Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
Set<TopicPartition> partitionsSet = new HashSet(partitions);
- emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
+ Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
+ while (msgIdIterator.hasNext()) {
+ KafkaSpoutMessageId msgId = msgIdIterator.next();
+ if (!partitionsSet.contains(msgId.getTopicPartition())) {
+ msgIdIterator.remove();
+ }
+ }
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 3e077ab..2d3eb56 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -27,24 +27,33 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;
import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+
import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.junit.Test;
import org.mockito.ArgumentCaptor;
+
import static org.mockito.Matchers.anyCollection;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.when;
+
import org.junit.Before;
import org.mockito.Captor;
+
import static org.mockito.Mockito.reset;
+
import org.mockito.MockitoAnnotations;
+
import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.Matchers.hasKey;
import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
@@ -67,7 +76,12 @@ public class KafkaSpoutRebalanceTest {
collectorMock = mock(SpoutOutputCollector.class);
conf = new HashMap<>();
consumerMock = mock(KafkaConsumer.class);
- consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
+ @Override
+ public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+ return consumerMock;
+ }
+ };
}
//Returns messageIds in order of emission
@@ -88,9 +102,9 @@ public class KafkaSpoutRebalanceTest {
//Make the consumer return a single message for each partition
Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
- firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
- secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
when(consumerMock.poll(anyLong()))
.thenReturn(new ConsumerRecords(firstPartitionRecords))
.thenReturn(new ConsumerRecords(secondPartitionRecords))
@@ -99,11 +113,11 @@ public class KafkaSpoutRebalanceTest {
//Emit the messages
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
reset(collectorMock);
spout.nextTuple();
ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
- verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+ verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
//Now rebalance
consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
index 492b79d..4f20b58 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -1,4 +1,3 @@
-
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
[3/4] storm git commit: Merge branch 'STORM-2104-1.x' of
https://github.com/srdo/storm into STORM-2104-1.x-merge
Posted by ka...@apache.org.
Merge branch 'STORM-2104-1.x' of https://github.com/srdo/storm into STORM-2104-1.x-merge
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e939d2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e939d2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e939d2c
Branch: refs/heads/1.x-branch
Commit: 2e939d2ca84ee34df81aa508bda2f6f09696aa40
Parents: bbc8b64 3e225b4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Dec 29 08:07:11 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Dec 29 08:07:11 2016 +0900
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 10 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 54 ++++--
.../storm/kafka/spout/KafkaSpoutConfig.java | 8 +-
.../kafka/spout/SerializableDeserializer.java | 25 +++
.../spout/internal/KafkaConsumerFactory.java | 27 +++
.../internal/KafkaConsumerFactoryDefault.java | 29 +++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 180 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 35 ++--
8 files changed, 337 insertions(+), 31 deletions(-)
----------------------------------------------------------------------
[4/4] storm git commit: STORM-2104: CHANGELOG
Posted by ka...@apache.org.
STORM-2104: CHANGELOG
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ab0285d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ab0285d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ab0285d
Branch: refs/heads/1.x-branch
Commit: 8ab0285d4bd0202eb472ec069ca04d2757e8eae6
Parents: 2e939d2
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Dec 29 09:29:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Dec 29 09:29:34 2016 +0900
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/8ab0285d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 54263d1..657e8f0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 1.1.0
+ * STORM-2104: New Kafka spout crashes if partitions are reassigned while tuples are in-flight
* STORM-2257: Add built in support for sum function with different types.
* STORM-2082: add sql external module storm-sql-hdfs
* STORM-2256: storm-pmml breaks on java 1.7