You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2016/12/29 00:29:47 UTC

[1/4] storm git commit: STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout

Repository: storm
Updated Branches:
  refs/heads/1.x-branch bbc8b641b -> 8ab0285d4


STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a92b7d0a
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a92b7d0a
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a92b7d0a

Branch: refs/heads/1.x-branch
Commit: a92b7d0ab2bc7c5bcd0b822785a50f59099ad715
Parents: 4154490
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Sep 20 13:53:55 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Dec 20 19:05:49 2016 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  10 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  48 ++++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |   8 +-
 .../kafka/spout/SerializableDeserializer.java   |  25 +++
 .../spout/internal/KafkaConsumerFactory.java    |  27 +++
 .../internal/KafkaConsumerFactoryDefault.java   |  29 ++++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 166 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  35 ++--
 .../builders/TopicKeyValueTupleBuilder.java     |   1 +
 9 files changed, 318 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 2d113fa..0878fdf 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -70,13 +70,19 @@
         <!--test dependencies -->
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <version>${mockito.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.11</version>
             <scope>test</scope>
         </dependency>
         <dependency>

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 38095fe..245ea1a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,6 +52,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 
 import org.apache.kafka.common.errors.InterruptException;
 
@@ -64,6 +66,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaConsumerFactory kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
     private transient boolean consumerAutoCommitMode;
 
@@ -86,8 +89,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+    }
+    
+    //This constructor is here for testing
+    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
         this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+        this.kafkaConsumerFactory = kafkaConsumerFactory;
     }
 
     @Override
@@ -147,6 +156,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             }
 
             retryService.retainAll(partitions);
+            
+            //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
+            Set<TopicPartition> partitionsSet = new HashSet(partitions);
+            emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
 
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -301,16 +314,19 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
-        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
-            final List<Object> tuple = tuplesBuilder.buildTuple(record);
-            kafkaSpoutStreams.emit(collector, tuple, msgId);
-            emitted.add(msgId);
-            numUncommittedOffsets++;
-            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
-                retryService.remove(msgId);  // re-emitted hence remove from failed
+        } else {
+            boolean isScheduled = retryService.isScheduled(msgId);
+            if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+                final List<Object> tuple = tuplesBuilder.buildTuple(record);
+                kafkaSpoutStreams.emit(collector, tuple, msgId);
+                emitted.add(msgId);
+                numUncommittedOffsets++;
+                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+                    retryService.remove(msgId);
+                }
+                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+                return true;
             }
-            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
-            return true;
         }
         return false;
     }
@@ -345,6 +361,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void ack(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        if(!emitted.contains(msgId)) {
+            LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+            return;
+        }
+        
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
             acked.get(msgId.getTopicPartition()).add(msgId);
         }
@@ -356,8 +377,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        emitted.remove(msgId);
+        if(!emitted.contains(msgId)) {
+            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+            return;
+        }
         if (msgId.numFails() < maxRetries) {
+            emitted.remove(msgId);
             msgId.incrementNumFails();
             retryService.schedule(msgId);
         } else { // limit to max number of retries
@@ -378,8 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private void subscribeKafkaConsumer() {
-        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
 
         if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
             final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..8aa525b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -105,8 +105,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     public static class Builder<K,V> {
         private final Map<String, Object> kafkaProps;
-        private Deserializer<K> keyDeserializer;
-        private Deserializer<V> valueDeserializer;
+        private SerializableDeserializer<K> keyDeserializer;
+        private SerializableDeserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
         private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -164,7 +164,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifying this key deserializer overrides the property key.deserializer
          */
-        public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+        public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
             this.keyDeserializer = keyDeserializer;
             return this;
         }
@@ -172,7 +172,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifying this value deserializer overrides the property value.deserializer
          */
-        public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+        public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
             this.valueDeserializer = valueDeserializer;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 0000000..eb76a90
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param <T> The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
new file mode 100644
index 0000000..0b253b4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+    public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
new file mode 100644
index 0000000..7900388
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
+
+    @Override
+    public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
new file mode 100644
index 0000000..3e077ab
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private TopologyContext contextMock;
+    private SpoutOutputCollector collectorMock;
+    private Map conf;
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        contextMock = mock(TopologyContext.class);
+        collectorMock = mock(SpoutOutputCollector.class);
+        conf = new HashMap<>();
+        consumerMock = mock(KafkaConsumer.class);
+        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+    }
+
+    //Returns messageIds in order of emission
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+        //Setup spout with mock consumer so we can get at the rebalance listener
+        spout.open(conf, contextMock, collectorMock);
+        spout.activate();
+
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+        //Assign partitions to the spout
+        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
+        assignedPartitions.add(partitionThatWillBeRevoked);
+        assignedPartitions.add(assignedPartition);
+        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        //Make the consumer return a single message for each partition
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+        when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords(firstPartitionRecords))
+                .thenReturn(new ConsumerRecords(secondPartitionRecords))
+                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+        //Emit the messages
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+        reset(collectorMock);
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+        //Now rebalance
+        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+        
+        List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+        emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+        emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+        return emittedMessageIds;
+    }
+
+    @Test
+    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+        //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+        TopicPartition assignedPartition = new TopicPartition(topic, 2);
+        
+        //Emit a message on each partition and revoke the first partition
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        
+        //Ack both emitted tuples
+        spout.ack(emittedMessageIds.get(0));
+        spout.ack(emittedMessageIds.get(1));
+
+        //Ensure the commit timer has expired
+        Thread.sleep(510);
+
+        //Make the spout commit any acked tuples
+        spout.nextTuple();
+        //Verify that it only committed the message on the assigned partition
+        verify(consumerMock).commitSync(commitCapture.capture());
+
+        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+        assertThat(commitCaptureMap, hasKey(assignedPartition));
+        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+    }
+    
+    @Test
+    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+        //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+        KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
+        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+        TopicPartition assignedPartition = new TopicPartition(topic, 2);
+        
+        //Emit a message on each partition and revoke the first partition
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        
+        //Fail both emitted tuples
+        spout.fail(emittedMessageIds.get(0));
+        spout.fail(emittedMessageIds.get(1));
+        
+        //Check that only the tuple on the currently assigned partition is retried
+        verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+        verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index baece93..95b2199 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -30,6 +30,7 @@ import java.util.Map;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 public class SingleTopicKafkaSpoutConfiguration {
+
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
@@ -46,22 +47,30 @@ public class SingleTopicKafkaSpoutConfiguration {
         return tp.createTopology();
     }
 
-    public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
-        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000)
-                .build();
+    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+    }
+
+    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+        return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+    }
+
+    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000)
+            .build();
     }
 
     protected static KafkaSpoutRetryService getRetryService() {
         return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
-                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+            KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
 
     }
 
-    protected static Map<String,Object> getKafkaConsumerProps(int port) {
+    protected static Map<String, Object> getKafkaConsumerProps(int port) {
         Map<String, Object> props = new HashMap<>();
         props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
         props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
@@ -73,13 +82,13 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
         return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-                new TopicKeyValueTupleBuilder<String, String>(TOPIC))
-                .build();
+            new TopicKeyValueTupleBuilder<String, String>(TOPIC))
+            .build();
     }
 
     public static KafkaSpoutStreams getKafkaSpoutStreams() {
         final Fields outputFields = new Fields("topic", "key", "value");
-        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC})  // contents of topics test sent to test_stream
-                .build();
+        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
+            .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a92b7d0a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
index 4f20b58..492b79d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -1,3 +1,4 @@
+
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  *   or more contributor license agreements.  See the NOTICE file


[2/4] storm git commit: STORM-2104: Rewrite a few parts to be compatible with Java 7

Posted by ka...@apache.org.
STORM-2104: Rewrite a few parts to be compatible with Java 7


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3e225b48
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3e225b48
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3e225b48

Branch: refs/heads/1.x-branch
Commit: 3e225b480cb83e60309893a10b52cd0dfaa7f673
Parents: a92b7d0
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Thu Dec 8 19:34:07 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Dec 20 19:10:45 2016 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  8 ++++++-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 24 ++++++++++++++++----
 .../builders/TopicKeyValueTupleBuilder.java     |  1 -
 3 files changed, 26 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 245ea1a..737c810 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -159,7 +159,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             
             //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
             Set<TopicPartition> partitionsSet = new HashSet(partitions);
-            emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
+            Iterator<KafkaSpoutMessageId> msgIdIterator = emitted.iterator();
+            while (msgIdIterator.hasNext()) {
+                KafkaSpoutMessageId msgId = msgIdIterator.next();
+                if (!partitionsSet.contains(msgId.getTopicPartition())) {
+                    msgIdIterator.remove();
+                }
+            }
 
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);

http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 3e077ab..2d3eb56 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -27,24 +27,33 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+
 import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.when;
+
 import org.junit.Before;
 import org.mockito.Captor;
+
 import static org.mockito.Mockito.reset;
+
 import org.mockito.MockitoAnnotations;
+
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
@@ -67,7 +76,12 @@ public class KafkaSpoutRebalanceTest {
         collectorMock = mock(SpoutOutputCollector.class);
         conf = new HashMap<>();
         consumerMock = mock(KafkaConsumer.class);
-        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+        consumerFactoryMock = new KafkaConsumerFactory<String, String>(){
+            @Override
+            public KafkaConsumer<String, String> createConsumer(KafkaSpoutConfig<String, String> kafkaSpoutConfig) {
+                return consumerMock;
+            } 
+        };
     }
 
     //Returns messageIds in order of emission
@@ -88,9 +102,9 @@ public class KafkaSpoutRebalanceTest {
 
         //Make the consumer return a single message for each partition
         Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
-        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord<>(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
         Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
-        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord<>(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
                 .thenReturn(new ConsumerRecords(firstPartitionRecords))
                 .thenReturn(new ConsumerRecords(secondPartitionRecords))
@@ -99,11 +113,11 @@ public class KafkaSpoutRebalanceTest {
         //Emit the messages
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+        verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
         reset(collectorMock);
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+        verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
 
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);

http://git-wip-us.apache.org/repos/asf/storm/blob/3e225b48/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
index 492b79d..4f20b58 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -1,4 +1,3 @@
-
 /*
  * Licensed to the Apache Software Foundation (ASF) under one
  *   or more contributor license agreements.  See the NOTICE file


[3/4] storm git commit: Merge branch 'STORM-2104-1.x' of https://github.com/srdo/storm into STORM-2104-1.x-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2104-1.x' of https://github.com/srdo/storm into STORM-2104-1.x-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/2e939d2c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/2e939d2c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/2e939d2c

Branch: refs/heads/1.x-branch
Commit: 2e939d2ca84ee34df81aa508bda2f6f09696aa40
Parents: bbc8b64 3e225b4
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Dec 29 08:07:11 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Dec 29 08:07:11 2016 +0900

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  10 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  54 ++++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |   8 +-
 .../kafka/spout/SerializableDeserializer.java   |  25 +++
 .../spout/internal/KafkaConsumerFactory.java    |  27 +++
 .../internal/KafkaConsumerFactoryDefault.java   |  29 +++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 180 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  35 ++--
 8 files changed, 337 insertions(+), 31 deletions(-)
----------------------------------------------------------------------



[4/4] storm git commit: STORM-2104: CHANGELOG

Posted by ka...@apache.org.
STORM-2104: CHANGELOG


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/8ab0285d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/8ab0285d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/8ab0285d

Branch: refs/heads/1.x-branch
Commit: 8ab0285d4bd0202eb472ec069ca04d2757e8eae6
Parents: 2e939d2
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Dec 29 09:29:34 2016 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Dec 29 09:29:34 2016 +0900

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/8ab0285d/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 54263d1..657e8f0 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 1.1.0
+ * STORM-2104: New Kafka spout crashes if partitions are reassigned while tuples are in-flight
  * STORM-2257: Add built in support for sum function with different types.
  * STORM-2082: add sql external module storm-sql-hdfs
  * STORM-2256: storm-pmml breaks on java 1.7