You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/08 18:14:35 UTC
[1/5] storm git commit: STORM-2104: More graceful handling of
acked/failed tuples after partition reassignment in new Kafka spout
Repository: storm
Updated Branches:
refs/heads/master e9cc74869 -> 806e3635c
STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11fcaff0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11fcaff0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11fcaff0
Branch: refs/heads/master
Commit: 11fcaff0e3f807419db2609bab07e0efb47f8df2
Parents: 43bdc03
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Sep 20 13:53:55 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Fri Nov 4 16:22:22 2016 +0100
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 10 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 46 +++--
.../storm/kafka/spout/KafkaSpoutConfig.java | 8 +-
.../kafka/spout/SerializableDeserializer.java | 25 +++
.../spout/internal/KafkaConsumerFactory.java | 27 +++
.../internal/KafkaConsumerFactoryDefault.java | 29 ++++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 166 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 93 +++++++++++
.../builders/TopicKeyValueTupleBuilder.java | 40 +++++
9 files changed, 427 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index f7a387c..34b9fdc 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -57,13 +57,19 @@
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
<scope>test</scope>
</dependency>
</dependencies>
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4389acb..5534082 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,6 +52,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
public class KafkaSpout<K, V> extends BaseRichSpout {
private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
@@ -62,6 +64,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
// Kafka
private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+ private final KafkaConsumerFactory kafkaConsumerFactory;
private transient KafkaConsumer<K, V> kafkaConsumer;
private transient boolean consumerAutoCommitMode;
@@ -84,8 +87,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+ }
+
+ //This constructor is here for testing
+ KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
this.kafkaSpoutConfig = kafkaSpoutConfig; // Pass in configuration
this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+ this.kafkaConsumerFactory = kafkaConsumerFactory;
}
@Override
@@ -145,6 +154,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
retryService.retainAll(partitions);
+
+ //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
+ Set<TopicPartition> partitionsSet = new HashSet(partitions);
+ emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
for (TopicPartition tp : partitions) {
final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -286,15 +299,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
- } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
- final List<Object> tuple = tuplesBuilder.buildTuple(record);
- kafkaSpoutStreams.emit(collector, tuple, msgId);
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
- retryService.remove(msgId); // re-emitted hence remove from failed
+ } else {
+ boolean isScheduled = retryService.isScheduled(msgId);
+ if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
}
}
@@ -328,6 +344,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void ack(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
+
if (!consumerAutoCommitMode) { // Only need to keep track of acked tuples if commits are not done automatically
acked.get(msgId.getTopicPartition()).add(msgId);
}
@@ -339,8 +360,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
@Override
public void fail(Object messageId) {
final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
- emitted.remove(msgId);
+ if(!emitted.contains(msgId)) {
+ LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+ return;
+ }
if (msgId.numFails() < maxRetries) {
+ emitted.remove(msgId);
msgId.incrementNumFails();
retryService.schedule(msgId);
} else { // limit to max number of retries
@@ -357,8 +382,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
}
private void subscribeKafkaConsumer() {
- kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
- kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..8aa525b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -105,8 +105,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
public static class Builder<K,V> {
private final Map<String, Object> kafkaProps;
- private Deserializer<K> keyDeserializer;
- private Deserializer<V> valueDeserializer;
+ private SerializableDeserializer<K> keyDeserializer;
+ private SerializableDeserializer<V> valueDeserializer;
private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -164,7 +164,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Specifying this key deserializer overrides the property key.deserializer
*/
- public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+ public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
this.keyDeserializer = keyDeserializer;
return this;
}
@@ -172,7 +172,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
/**
* Specifying this value deserializer overrides the property value.deserializer
*/
- public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+ public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
this.valueDeserializer = valueDeserializer;
return this;
}
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 0000000..eb76a90
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param <T> The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable {
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
new file mode 100644
index 0000000..0b253b4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+ public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
new file mode 100644
index 0000000..7900388
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
+
+ @Override
+ public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+ return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+ kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
new file mode 100644
index 0000000..b5428f9
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
+ private Map conf;
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
+ conf = new HashMap<>();
+ consumerMock = mock(KafkaConsumer.class);
+ consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ }
+
+ //Returns messageIds in order of emission
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(partitionThatWillBeRevoked);
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ //Make the consumer return a single message for each partition
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+ //Emit the messages
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+ //Now rebalance
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+
+ List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+ emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+ emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+ return emittedMessageIds;
+ }
+
+ @Test
+ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.topic;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Thread.sleep(510);
+
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
+
+ @Test
+ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+ String topic = SingleTopicKafkaSpoutConfiguration.topic;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Fail both emitted tuples
+ spout.fail(emittedMessageIds.get(0));
+ spout.fail(emittedMessageIds.get(1));
+
+ //Check that only the tuple on the currently assigned partition is retried
+ verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+ verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
new file mode 100644
index 0000000..157543c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+public class SingleTopicKafkaSpoutConfiguration {
+ public static final String stream = "test_stream";
+ public static final String topic = "test";
+
+ static public Config getConfig() {
+ Config config = new Config();
+ config.setDebug(true);
+ return config;
+ }
+
+ static public StormTopology getTopologyKafkaSpout(int port) {
+ final TopologyBuilder tp = new TopologyBuilder();
+ tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+ tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", stream);
+ return tp.createTopology();
+ }
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ }
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ }
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+ .setFirstPollOffsetStrategy(EARLIEST)
+ .setMaxUncommittedOffsets(250)
+ .setPollTimeoutMs(1000)
+ .build();
+ }
+
+ static protected KafkaSpoutRetryService getRetryService() {
+ return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
+ KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+ }
+
+ static protected Map<String,Object> getKafkaConsumerProps(int port) {
+ Map<String, Object> props = new HashMap<>();
+ props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
+ props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+ props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+ props.put("max.poll.records", "5");
+ return props;
+ }
+
+ static protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+ return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+ new TopicKeyValueTupleBuilder<String, String>(topic))
+ .build();
+ }
+
+ static public KafkaSpoutStreams getKafkaSpoutStreams() {
+ final Fields outputFields = new Fields("topic", "key", "value");
+ return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, stream, new String[]{topic}) // contents of topics test sent to test_stream
+ .build();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
new file mode 100644
index 0000000..d6b8a2a
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+ /**
+ * @param topics list of topics that use this implementation to build tuples
+ */
+ public TopicKeyValueTupleBuilder(String... topics) {
+ super(topics);
+ }
+
+ @Override
+ public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+ return new Values(consumerRecord.topic(),
+ consumerRecord.key(),
+ consumerRecord.value());
+ }
+}
\ No newline at end of file
[2/5] storm git commit: Merge branch 'master' of
https://github.com/apache/storm into STORM-2104
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2104
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd1f5c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd1f5c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd1f5c54
Branch: refs/heads/master
Commit: bd1f5c54752f67b484a83c26667331234234d3a3
Parents: 11fcaff a912cf3
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Sun Nov 20 12:49:49 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Sun Nov 20 13:01:22 2016 +0100
----------------------------------------------------------------------
CHANGELOG.md | 16 +
examples/storm-kafka-client-examples/pom.xml | 134 +++
.../TridentKafkaClientWordCountNamedTopics.java | 156 ++++
...identKafkaClientWordCountWildcardTopics.java | 46 ++
examples/storm-kafka-examples/pom.xml | 18 +
.../storm/kafka/TridentKafkaTopology.java | 91 ---
.../kafka/trident/KafkaProducerTopology.java | 67 ++
.../storm/kafka/trident/LocalSubmitter.java | 91 +++
.../trident/TridentKafkaConsumerTopology.java | 94 +++
.../kafka/trident/TridentKafkaTopology.java | 89 ++
.../kafka/trident/TridentKafkaWordCount.java | 140 ++++
examples/storm-starter/pom.xml | 16 -
.../starter/trident/DebugMemoryMapState.java | 2 +-
.../starter/trident/TridentKafkaWordCount.java | 278 -------
external/sql/pom.xml | 1 +
.../apache/storm/sql/parser/SqlCreateTable.java | 20 +-
.../test/org/apache/storm/sql/TestStormSql.java | 11 +-
.../storm-sql-external/storm-sql-kafka/pom.xml | 4 -
.../sql/kafka/KafkaDataSourcesProvider.java | 63 +-
.../sql/kafka/TestKafkaDataSourcesProvider.java | 31 +-
.../storm-sql-mongodb/pom.xml | 84 ++
.../sql/mongodb/MongoDataSourcesProvider.java | 121 +++
...apache.storm.sql.runtime.DataSourcesProvider | 16 +
.../mongodb/TestMongoDataSourcesProvider.java | 122 +++
.../storm-sql-external/storm-sql-redis/pom.xml | 4 -
.../sql/redis/RedisDataSourcesProvider.java | 48 +-
.../sql/redis/TestRedisDataSourcesProvider.java | 26 +-
external/sql/storm-sql-runtime/pom.xml | 14 +
.../storm/sql/runtime/DataSourcesProvider.java | 5 +-
.../storm/sql/runtime/DataSourcesRegistry.java | 5 +-
.../sql/runtime/serde/avro/AvroScheme.java | 74 ++
.../sql/runtime/serde/avro/AvroSerializer.java | 72 ++
.../sql/runtime/serde/avro/CachedSchemas.java | 41 +
.../storm/sql/runtime/serde/csv/CsvScheme.java | 70 ++
.../sql/runtime/serde/csv/CsvSerializer.java | 59 ++
.../sql/runtime/serde/json/JsonScheme.java | 2 +-
.../storm/sql/runtime/serde/tsv/TsvScheme.java | 58 ++
.../sql/runtime/serde/tsv/TsvSerializer.java | 54 ++
.../storm/sql/runtime/utils/FieldInfoUtils.java | 39 +
.../storm/sql/runtime/utils/SerdeUtils.java | 123 +++
.../apache/storm/sql/runtime/utils/Utils.java | 55 ++
.../apache/storm/sql/TestAvroSerializer.java | 72 ++
.../org/apache/storm/sql/TestCsvSerializer.java | 54 ++
.../org/apache/storm/sql/TestTsvSerializer.java | 46 ++
.../storm/hdfs/bolt/AbstractHdfsBolt.java | 25 +-
external/storm-kafka-client/pom.xml | 26 +
.../apache/storm/kafka/spout/KafkaSpout.java | 31 +-
.../storm/kafka/spout/KafkaSpoutStream.java | 2 +-
.../storm/kafka/spout/KafkaSpoutStreams.java | 3 +
.../spout/KafkaSpoutStreamsNamedTopics.java | 11 +
.../spout/KafkaSpoutStreamsWildcardTopics.java | 6 +
.../trident/KafkaTridentSpoutBatchMetadata.java | 83 ++
.../spout/trident/KafkaTridentSpoutEmitter.java | 197 +++++
.../spout/trident/KafkaTridentSpoutManager.java | 119 +++
.../spout/trident/KafkaTridentSpoutOpaque.java | 80 ++
.../KafkaTridentSpoutOpaqueCoordinator.java | 64 ++
.../KafkaTridentSpoutTopicPartition.java | 68 ++
...KafkaTridentSpoutTopicPartitionRegistry.java | 48 ++
.../trident/KafkaTridentSpoutTransactional.java | 48 ++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 4 +-
.../kafka/spout/SingleTopicKafkaSpoutTest.java | 240 ++++++
.../SingleTopicKafkaSpoutConfiguration.java | 30 +-
.../builders/TopicKeyValueTupleBuilder.java | 2 +-
.../kafka/trident/OpaqueTridentKafkaSpout.java | 5 +-
pom.xml | 1 +
.../org/apache/storm/daemon/local_executor.clj | 42 -
.../src/clj/org/apache/storm/daemon/worker.clj | 818 -------------------
storm-core/src/clj/org/apache/storm/testing.clj | 28 +-
.../src/jvm/org/apache/storm/Constants.java | 6 +-
.../src/jvm/org/apache/storm/daemon/Task.java | 46 +-
.../storm/daemon/supervisor/AdvancedFSOps.java | 9 +-
.../storm/daemon/supervisor/BasicContainer.java | 2 +-
.../storm/daemon/supervisor/LocalContainer.java | 11 +-
.../daemon/supervisor/ReadClusterState.java | 2 +-
.../storm/daemon/worker/LogConfigManager.java | 156 ++++
.../org/apache/storm/daemon/worker/Worker.java | 454 ++++++++++
.../apache/storm/daemon/worker/WorkerState.java | 691 ++++++++++++++++
.../jvm/org/apache/storm/executor/Executor.java | 55 +-
.../apache/storm/executor/ExecutorShutdown.java | 5 +-
.../apache/storm/executor/ExecutorTransfer.java | 15 +-
.../apache/storm/executor/IRunningExecutor.java | 2 +-
.../apache/storm/executor/LocalExecutor.java | 56 ++
.../storm/executor/bolt/BoltExecutor.java | 10 +-
.../storm/executor/spout/SpoutExecutor.java | 3 +-
.../DeserializingConnectionCallback.java | 10 +-
.../storm/scheduler/resource/RAS_Node.java | 4 +-
.../org/apache/storm/security/auth/AutoSSL.java | 161 ++++
.../storm/security/auth/ThriftClient.java | 13 +-
.../org/apache/storm/task/TopologyContext.java | 3 +
.../org/apache/storm/utils/TransferDrainer.java | 26 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 116 ++-
.../apache/storm/messaging/netty_unit_test.clj | 13 +-
.../test/clj/org/apache/storm/nimbus_test.clj | 6 +-
.../test/clj/org/apache/storm/worker_test.clj | 205 -----
.../daemon/supervisor/BasicContainerTest.java | 2 +-
.../daemon/worker/LogConfigManagerTest.java | 219 +++++
.../apache/storm/daemon/worker/WorkerTest.java | 39 +
.../apache/storm/security/auth/AutoSSLTest.java | 136 +++
storm-dist/binary/src/main/assembly/binary.xml | 7 +
99 files changed, 5299 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 34b9fdc,2d9a844..2b56336
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -70,8 -77,21 +84,20 @@@
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
- <version>4.11</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>info.batey.kafka</groupId>
+ <artifactId>kafka-unit</artifactId>
+ <version>0.6</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>log4j-over-slf4j</artifactId>
+ <version>${log4j-over-slf4j.version}</version>
+ <scope>test</scope>
+ </dependency>
</dependencies>
<build>
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5534082,439492b..ce01a76
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@@ -299,19 -289,18 +302,21 @@@ public class KafkaSpout<K, V> extends B
LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
} else if (emitted.contains(msgId)) { // has been emitted and it's pending ack or fail
LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
- } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
- final List<Object> tuple = tuplesBuilder.buildTuple(record);
- kafkaSpoutStreams.emit(collector, tuple, msgId);
- emitted.add(msgId);
- numUncommittedOffsets++;
- if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
- retryService.remove(msgId); // re-emitted hence remove from failed
+ } else {
+ boolean isScheduled = retryService.isScheduled(msgId);
+ if (!isScheduled || retryService.isReady(msgId)) { // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+ final List<Object> tuple = tuplesBuilder.buildTuple(record);
+ kafkaSpoutStreams.emit(collector, tuple, msgId);
+ emitted.add(msgId);
+ numUncommittedOffsets++;
+ if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+ retryService.remove(msgId);
+ }
+ LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
++ return true;
}
- LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
- return true;
}
+ return false;
}
private void commitOffsetsForAckedTuples() {
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index b5428f9,0000000..3e077ab
mode 100644,000000..100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@@ -1,166 -1,0 +1,166 @@@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+ @Captor
+ private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+ private TopologyContext contextMock;
+ private SpoutOutputCollector collectorMock;
+ private Map conf;
+ private KafkaConsumer<String, String> consumerMock;
+ private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+ @Before
+ public void setUp() {
+ MockitoAnnotations.initMocks(this);
+ contextMock = mock(TopologyContext.class);
+ collectorMock = mock(SpoutOutputCollector.class);
+ conf = new HashMap<>();
+ consumerMock = mock(KafkaConsumer.class);
+ consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+ }
+
+ //Returns messageIds in order of emission
+ private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+ //Setup spout with mock consumer so we can get at the rebalance listener
+ spout.open(conf, contextMock, collectorMock);
+ spout.activate();
+
+ ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+ verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+ //Assign partitions to the spout
+ ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+ List<TopicPartition> assignedPartitions = new ArrayList<>();
+ assignedPartitions.add(partitionThatWillBeRevoked);
+ assignedPartitions.add(assignedPartition);
+ consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+ //Make the consumer return a single message for each partition
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+ firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+ secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+ when(consumerMock.poll(anyLong()))
+ .thenReturn(new ConsumerRecords(firstPartitionRecords))
+ .thenReturn(new ConsumerRecords(secondPartitionRecords))
+ .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+ //Emit the messages
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+ reset(collectorMock);
+ spout.nextTuple();
+ ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+ verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+ //Now rebalance
+ consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+ consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+
+ List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+ emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+ emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+ return emittedMessageIds;
+ }
+
+ @Test
+ public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.topic;
++ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Ack both emitted tuples
+ spout.ack(emittedMessageIds.get(0));
+ spout.ack(emittedMessageIds.get(1));
+
+ //Ensure the commit timer has expired
+ Thread.sleep(510);
+
+ //Make the spout commit any acked tuples
+ spout.nextTuple();
+ //Verify that it only committed the message on the assigned partition
+ verify(consumerMock).commitSync(commitCapture.capture());
+
+ Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+ assertThat(commitCaptureMap, hasKey(assignedPartition));
+ assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+ }
+
+ @Test
+ public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+ //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+ KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+ KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
- String topic = SingleTopicKafkaSpoutConfiguration.topic;
++ String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+ TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+ TopicPartition assignedPartition = new TopicPartition(topic, 2);
+
+ //Emit a message on each partition and revoke the first partition
+ List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+
+ //Fail both emitted tuples
+ spout.fail(emittedMessageIds.get(0));
+ spout.fail(emittedMessageIds.get(1));
+
+ //Check that only the tuple on the currently assigned partition is retried
+ verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+ verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 157543c,baece93..6921f7c
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@@ -46,17 -46,9 +46,17 @@@ public class SingleTopicKafkaSpoutConfi
return tp.createTopology();
}
- static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
- return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
- .setOffsetCommitPeriodMs(10_000)
++ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+ }
-
- static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
++
++ static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+ return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+ }
+
+ static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+ return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+ .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
.setFirstPollOffsetStrategy(EARLIEST)
.setMaxUncommittedOffsets(250)
.setPollTimeoutMs(1000)
[5/5] storm git commit: Add STORM-2104 to Changelog
Posted by bo...@apache.org.
Add STORM-2104 to Changelog
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/806e3635
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/806e3635
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/806e3635
Branch: refs/heads/master
Commit: 806e3635c301bf02d4525f1acb837110a87c4e26
Parents: 23899bd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Dec 8 12:14:07 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Dec 8 12:14:07 2016 -0600
----------------------------------------------------------------------
CHANGELOG.md | 1 +
1 file changed, 1 insertion(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/806e3635/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 49dfcef..5f73dc7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
## 2.0.0
+ * STORM-2104: More graceful handling of acked/failed tuples after partition reassignment
* STORM-1281: LocalCluster, testing4j and testing.clj to java
* STORM-2226: Fix kafka spout offset lag ui for kerberized kafka
* STORM-1276: line for line translation of nimbus to java
[4/5] storm git commit: Merge branch 'STORM-2104' of
https://github.com/srdo/storm into STORM-2104
Posted by bo...@apache.org.
Merge branch 'STORM-2104' of https://github.com/srdo/storm into STORM-2104
STORM-2104: More graceful handling of acked/failed tuples after partition reassignment
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/23899bd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/23899bd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/23899bd8
Branch: refs/heads/master
Commit: 23899bd8484b0d3f035436ad626a08a4581a00ee
Parents: e9cc748 29b52e9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Dec 8 12:02:18 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Dec 8 12:02:18 2016 -0600
----------------------------------------------------------------------
external/storm-kafka-client/pom.xml | 9 +-
.../apache/storm/kafka/spout/KafkaSpout.java | 48 ++++--
.../storm/kafka/spout/KafkaSpoutConfig.java | 8 +-
.../kafka/spout/SerializableDeserializer.java | 25 +++
.../spout/internal/KafkaConsumerFactory.java | 27 +++
.../internal/KafkaConsumerFactoryDefault.java | 29 ++++
.../kafka/spout/KafkaSpoutRebalanceTest.java | 166 +++++++++++++++++++
.../SingleTopicKafkaSpoutConfiguration.java | 14 +-
8 files changed, 306 insertions(+), 20 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/23899bd8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
[3/5] storm git commit: Merge branch 'master' of
https://github.com/apache/storm into STORM-2104
Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2104
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29b52e9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29b52e9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29b52e9c
Branch: refs/heads/master
Commit: 29b52e9c266b130642d99e665ff73c55dfabba13
Parents: bd1f5c5 57998b4
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 20:54:47 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Nov 29 20:54:47 2016 +0100
----------------------------------------------------------------------
CHANGELOG.md | 9 +
docs/index.md | 12 +-
docs/storm-cassandra.md | 23 --
docs/storm-druid.md | 119 +++++++++
docs/storm-kafka-client.md | 188 ++++++++++++++
docs/storm-kinesis.md | 136 ++++++++++
docs/storm-mongodb.md | 23 --
docs/storm-opentsdb.md | 52 ++++
docs/storm-redis.md | 24 --
docs/storm-sql-reference.md | 111 +++++++-
examples/storm-starter/pom.xml | 1 +
external/flux/pom.xml | 6 -
external/storm-cassandra/pom.xml | 7 -
external/storm-eventhubs/pom.xml | 6 -
external/storm-hdfs/pom.xml | 6 -
.../apache/storm/hdfs/trident/HdfsState.java | 31 +--
external/storm-hive/pom.xml | 6 -
external/storm-jdbc/pom.xml | 6 -
external/storm-jms/examples/pom.xml | 1 -
external/storm-jms/pom.xml | 12 -
external/storm-kafka-client/pom.xml | 5 -
.../storm/kafka/monitor/KafkaOffsetLagUtil.java | 6 +-
external/storm-kafka/pom.xml | 6 -
.../storm/kafka/DynamicBrokersReader.java | 8 +-
.../src/jvm/org/apache/storm/kafka/ZkState.java | 2 +-
.../storm/kinesis/spout/ZKConnection.java | 2 +-
external/storm-metrics/pom.xml | 1 -
external/storm-solr/pom.xml | 6 -
.../maven-shade-clojure-transformer/pom.xml | 1 -
storm-buildtools/storm-maven-plugins/pom.xml | 2 +-
storm-core/pom.xml | 2 +-
.../storm/cluster/StormClusterStateImpl.java | 8 +-
.../jvm/org/apache/storm/daemon/DrpcServer.java | 2 +-
.../org/apache/storm/daemon/StormCommon.java | 16 +-
.../org/apache/storm/drpc/ReturnResults.java | 14 +-
.../jvm/org/apache/storm/executor/Executor.java | 7 +-
.../apache/storm/grouping/ShuffleGrouping.java | 12 +-
.../apache/storm/multilang/JsonSerializer.java | 11 +-
.../jvm/org/apache/storm/spout/ShellSpout.java | 1 +
.../storm/task/GeneralTopologyContext.java | 11 +-
.../apache/storm/topology/TopologyBuilder.java | 12 +-
.../trident/drpc/ReturnResultsReducer.java | 11 +-
.../state/JSONNonTransactionalSerializer.java | 5 +-
.../trident/state/JSONOpaqueSerializer.java | 6 +-
.../state/JSONTransactionalSerializer.java | 6 +-
.../storm/trident/testing/TuplifyArgs.java | 14 +-
.../topology/state/TransactionalState.java | 2 +-
.../apache/storm/ui/FilterConfiguration.java | 2 +-
.../apache/storm/utils/TopologySpoutLag.java | 7 +-
.../src/jvm/org/apache/storm/utils/Utils.java | 4 +-
.../src/native/worker-launcher/impl/main.c | 3 +-
.../worker-launcher/impl/worker-launcher.c | 260 ++++++-------------
.../worker-launcher/impl/worker-launcher.h | 16 +-
.../storm/grouping/ShuffleGroupingTest.java | 147 +++++++++++
storm-dist/binary/pom.xml | 2 +-
storm-dist/source/pom.xml | 2 +-
storm-multilang/javascript/pom.xml | 2 +-
storm-multilang/python/pom.xml | 2 +-
storm-multilang/ruby/pom.xml | 2 +-
storm-rename-hack/pom.xml | 3 +-
60 files changed, 982 insertions(+), 428 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/29b52e9c/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 2b56336,a7971e2..97ed359
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -71,19 -71,7 +71,14 @@@
<!--test dependencies -->
<dependency>
<groupId>org.mockito</groupId>
- <artifactId>mockito-all</artifactId>
+ <artifactId>mockito-core</artifactId>
+ <scope>test</scope>
+ <version>${mockito.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.hamcrest</groupId>
+ <artifactId>hamcrest-all</artifactId>
+ <version>1.3</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>junit</groupId>
- <artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>