You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/12/08 18:14:35 UTC

[1/5] storm git commit: STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout

Repository: storm
Updated Branches:
  refs/heads/master e9cc74869 -> 806e3635c


STORM-2104: More graceful handling of acked/failed tuples after partition reassignment in new Kafka spout


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/11fcaff0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/11fcaff0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/11fcaff0

Branch: refs/heads/master
Commit: 11fcaff0e3f807419db2609bab07e0efb47f8df2
Parents: 43bdc03
Author: Stig Rohde D�ssing <st...@gmail.com>
Authored: Tue Sep 20 13:53:55 2016 +0200
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Fri Nov 4 16:22:22 2016 +0100

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |  10 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  46 +++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |   8 +-
 .../kafka/spout/SerializableDeserializer.java   |  25 +++
 .../spout/internal/KafkaConsumerFactory.java    |  27 +++
 .../internal/KafkaConsumerFactoryDefault.java   |  29 ++++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 166 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  93 +++++++++++
 .../builders/TopicKeyValueTupleBuilder.java     |  40 +++++
 9 files changed, 427 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index f7a387c..34b9fdc 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -57,13 +57,19 @@
         <!--test dependencies -->
         <dependency>
             <groupId>org.mockito</groupId>
-            <artifactId>mockito-all</artifactId>
+            <artifactId>mockito-core</artifactId>
+            <scope>test</scope>
+            <version>${mockito.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-all</artifactId>
+            <version>1.3</version>
             <scope>test</scope>
         </dependency>
         <dependency>
             <groupId>junit</groupId>
             <artifactId>junit</artifactId>
-            <version>4.11</version>
             <scope>test</scope>
         </dependency>
     </dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 4389acb..5534082 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -52,6 +52,8 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
@@ -62,6 +64,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final KafkaConsumerFactory kafkaConsumerFactory;
     private transient KafkaConsumer<K, V> kafkaConsumer;
     private transient boolean consumerAutoCommitMode;
 
@@ -84,8 +87,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault());
+    }
+    
+    //This constructor is here for testing
+    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
         this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+        this.kafkaConsumerFactory = kafkaConsumerFactory;
     }
 
     @Override
@@ -145,6 +154,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             }
 
             retryService.retainAll(partitions);
+            
+            //Emitted messages for partitions that are no longer assigned to this spout can't be acked, and they shouldn't be retried. Remove them from emitted.
+            Set<TopicPartition> partitionsSet = new HashSet(partitions);
+            emitted.removeIf((msgId) -> !partitionsSet.contains(msgId.getTopicPartition()));
 
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
@@ -286,15 +299,18 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
         } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
             LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
-        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
-            final List<Object> tuple = tuplesBuilder.buildTuple(record);
-            kafkaSpoutStreams.emit(collector, tuple, msgId);
-            emitted.add(msgId);
-            numUncommittedOffsets++;
-            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
-                retryService.remove(msgId);  // re-emitted hence remove from failed
+        } else {
+            boolean isScheduled = retryService.isScheduled(msgId);
+            if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+                final List<Object> tuple = tuplesBuilder.buildTuple(record);
+                kafkaSpoutStreams.emit(collector, tuple, msgId);
+                emitted.add(msgId);
+                numUncommittedOffsets++;
+                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
+                    retryService.remove(msgId);
+                }
+                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
             }
-            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
         }
     }
 
@@ -328,6 +344,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void ack(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        if(!emitted.contains(msgId)) {
+            LOG.debug("Received ack for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+            return;
+        }
+        
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
             acked.get(msgId.getTopicPartition()).add(msgId);
         }
@@ -339,8 +360,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
-        emitted.remove(msgId);
+        if(!emitted.contains(msgId)) {
+            LOG.debug("Received fail for tuple this spout is no longer tracking. Partitions may have been reassigned. Ignoring message [{}]", msgId);
+            return;
+        }
         if (msgId.numFails() < maxRetries) {
+            emitted.remove(msgId);
             msgId.incrementNumFails();
             retryService.schedule(msgId);
         } else { // limit to max number of retries
@@ -357,8 +382,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private void subscribeKafkaConsumer() {
-        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
-                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
 
         if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
             final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..8aa525b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -105,8 +105,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     public static class Builder<K,V> {
         private final Map<String, Object> kafkaProps;
-        private Deserializer<K> keyDeserializer;
-        private Deserializer<V> valueDeserializer;
+        private SerializableDeserializer<K> keyDeserializer;
+        private SerializableDeserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
         private int maxRetries = DEFAULT_MAX_RETRIES;
@@ -164,7 +164,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifying this key deserializer overrides the property key.deserializer
          */
-        public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+        public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
             this.keyDeserializer = keyDeserializer;
             return this;
         }
@@ -172,7 +172,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         /**
          * Specifying this value deserializer overrides the property value.deserializer
          */
-        public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+        public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
             this.valueDeserializer = valueDeserializer;
             return this;
         }

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
new file mode 100644
index 0000000..eb76a90
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SerializableDeserializer.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import org.apache.kafka.common.serialization.Deserializer;
+
+/**
+ * @param <T> The type this deserializer deserializes to.
+ */
+public interface SerializableDeserializer<T> extends Deserializer<T>, Serializable { 
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
new file mode 100644
index 0000000..0b253b4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
@@ -0,0 +1,27 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing
+ */
+public interface KafkaConsumerFactory<K, V> extends Serializable {
+    public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
new file mode 100644
index 0000000..7900388
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
+
+    @Override
+    public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
new file mode 100644
index 0000000..b5428f9
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -0,0 +1,166 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.when;
+import org.junit.Before;
+import org.mockito.Captor;
+import static org.mockito.Mockito.reset;
+import org.mockito.MockitoAnnotations;
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+
+public class KafkaSpoutRebalanceTest {
+
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+
+    private TopologyContext contextMock;
+    private SpoutOutputCollector collectorMock;
+    private Map conf;
+    private KafkaConsumer<String, String> consumerMock;
+    private KafkaConsumerFactory<String, String> consumerFactoryMock;
+
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
+        contextMock = mock(TopologyContext.class);
+        collectorMock = mock(SpoutOutputCollector.class);
+        conf = new HashMap<>();
+        consumerMock = mock(KafkaConsumer.class);
+        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
+    }
+
+    //Returns messageIds in order of emission
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
+        //Setup spout with mock consumer so we can get at the rebalance listener
+        spout.open(conf, contextMock, collectorMock);
+        spout.activate();
+
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
+
+        //Assign partitions to the spout
+        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
+        List<TopicPartition> assignedPartitions = new ArrayList<>();
+        assignedPartitions.add(partitionThatWillBeRevoked);
+        assignedPartitions.add(assignedPartition);
+        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
+
+        //Make the consumer return a single message for each partition
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
+        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
+        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
+        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
+        when(consumerMock.poll(anyLong()))
+                .thenReturn(new ConsumerRecords(firstPartitionRecords))
+                .thenReturn(new ConsumerRecords(secondPartitionRecords))
+                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+
+        //Emit the messages
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
+        reset(collectorMock);
+        spout.nextTuple();
+        ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
+        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
+
+        //Now rebalance
+        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
+        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
+        
+        List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
+        emittedMessageIds.add(messageIdForRevokedPartition.getValue());
+        emittedMessageIds.add(messageIdForAssignedPartition.getValue());
+        return emittedMessageIds;
+    }
+
+    @Test
+    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+        //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+        String topic = SingleTopicKafkaSpoutConfiguration.topic;
+        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+        TopicPartition assignedPartition = new TopicPartition(topic, 2);
+        
+        //Emit a message on each partition and revoke the first partition
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        
+        //Ack both emitted tuples
+        spout.ack(emittedMessageIds.get(0));
+        spout.ack(emittedMessageIds.get(1));
+
+        //Ensure the commit timer has expired
+        Thread.sleep(510);
+
+        //Make the spout commit any acked tuples
+        spout.nextTuple();
+        //Verify that it only committed the message on the assigned partition
+        verify(consumerMock).commitSync(commitCapture.capture());
+
+        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
+        assertThat(commitCaptureMap, hasKey(assignedPartition));
+        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
+    }
+    
+    @Test
+    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
+        //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+        KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+        String topic = SingleTopicKafkaSpoutConfiguration.topic;
+        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
+        TopicPartition assignedPartition = new TopicPartition(topic, 2);
+        
+        //Emit a message on each partition and revoke the first partition
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        
+        //Fail both emitted tuples
+        spout.fail(emittedMessageIds.get(0));
+        spout.fail(emittedMessageIds.get(1));
+        
+        //Check that only the tuple on the currently assigned partition is retried
+        verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
+        verify(retryServiceMock).schedule(emittedMessageIds.get(1));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
new file mode 100644
index 0000000..157543c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.storm.Config;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+public class SingleTopicKafkaSpoutConfiguration {
+    public static final String stream = "test_stream";
+    public static final String topic = "test";
+
+    static public Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    static public StormTopology getTopologyKafkaSpout(int port) {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", stream);
+        return tp.createTopology();
+    }
+
+    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
+        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+    }
+    
+    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
+        return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+    }
+    
+    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
+                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .setPollTimeoutMs(1000)
+                .build();
+    }
+
+    static protected KafkaSpoutRetryService getRetryService() {
+        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
+                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
+    }
+
+    static protected Map<String,Object> getKafkaConsumerProps(int port) {
+        Map<String, Object> props = new HashMap<>();
+        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
+        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put("max.poll.records", "5");
+        return props;
+    }
+
+    static protected KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
+                new TopicKeyValueTupleBuilder<String, String>(topic))
+                .build();
+    }
+
+    static public KafkaSpoutStreams getKafkaSpoutStreams() {
+        final Fields outputFields = new Fields("topic", "key", "value");
+        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, stream, new String[]{topic})  // contents of topics test sent to test_stream
+                .build();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/11fcaff0/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
new file mode 100644
index 0000000..d6b8a2a
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.builders;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicKeyValueTupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.key(),
+                consumerRecord.value());
+    }
+}
\ No newline at end of file


[2/5] storm git commit: Merge branch 'master' of https://github.com/apache/storm into STORM-2104

Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2104


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/bd1f5c54
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/bd1f5c54
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/bd1f5c54

Branch: refs/heads/master
Commit: bd1f5c54752f67b484a83c26667331234234d3a3
Parents: 11fcaff a912cf3
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Sun Nov 20 12:49:49 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Sun Nov 20 13:01:22 2016 +0100

----------------------------------------------------------------------
 CHANGELOG.md                                    |  16 +
 examples/storm-kafka-client-examples/pom.xml    | 134 +++
 .../TridentKafkaClientWordCountNamedTopics.java | 156 ++++
 ...identKafkaClientWordCountWildcardTopics.java |  46 ++
 examples/storm-kafka-examples/pom.xml           |  18 +
 .../storm/kafka/TridentKafkaTopology.java       |  91 ---
 .../kafka/trident/KafkaProducerTopology.java    |  67 ++
 .../storm/kafka/trident/LocalSubmitter.java     |  91 +++
 .../trident/TridentKafkaConsumerTopology.java   |  94 +++
 .../kafka/trident/TridentKafkaTopology.java     |  89 ++
 .../kafka/trident/TridentKafkaWordCount.java    | 140 ++++
 examples/storm-starter/pom.xml                  |  16 -
 .../starter/trident/DebugMemoryMapState.java    |   2 +-
 .../starter/trident/TridentKafkaWordCount.java  | 278 -------
 external/sql/pom.xml                            |   1 +
 .../apache/storm/sql/parser/SqlCreateTable.java |  20 +-
 .../test/org/apache/storm/sql/TestStormSql.java |  11 +-
 .../storm-sql-external/storm-sql-kafka/pom.xml  |   4 -
 .../sql/kafka/KafkaDataSourcesProvider.java     |  63 +-
 .../sql/kafka/TestKafkaDataSourcesProvider.java |  31 +-
 .../storm-sql-mongodb/pom.xml                   |  84 ++
 .../sql/mongodb/MongoDataSourcesProvider.java   | 121 +++
 ...apache.storm.sql.runtime.DataSourcesProvider |  16 +
 .../mongodb/TestMongoDataSourcesProvider.java   | 122 +++
 .../storm-sql-external/storm-sql-redis/pom.xml  |   4 -
 .../sql/redis/RedisDataSourcesProvider.java     |  48 +-
 .../sql/redis/TestRedisDataSourcesProvider.java |  26 +-
 external/sql/storm-sql-runtime/pom.xml          |  14 +
 .../storm/sql/runtime/DataSourcesProvider.java  |   5 +-
 .../storm/sql/runtime/DataSourcesRegistry.java  |   5 +-
 .../sql/runtime/serde/avro/AvroScheme.java      |  74 ++
 .../sql/runtime/serde/avro/AvroSerializer.java  |  72 ++
 .../sql/runtime/serde/avro/CachedSchemas.java   |  41 +
 .../storm/sql/runtime/serde/csv/CsvScheme.java  |  70 ++
 .../sql/runtime/serde/csv/CsvSerializer.java    |  59 ++
 .../sql/runtime/serde/json/JsonScheme.java      |   2 +-
 .../storm/sql/runtime/serde/tsv/TsvScheme.java  |  58 ++
 .../sql/runtime/serde/tsv/TsvSerializer.java    |  54 ++
 .../storm/sql/runtime/utils/FieldInfoUtils.java |  39 +
 .../storm/sql/runtime/utils/SerdeUtils.java     | 123 +++
 .../apache/storm/sql/runtime/utils/Utils.java   |  55 ++
 .../apache/storm/sql/TestAvroSerializer.java    |  72 ++
 .../org/apache/storm/sql/TestCsvSerializer.java |  54 ++
 .../org/apache/storm/sql/TestTsvSerializer.java |  46 ++
 .../storm/hdfs/bolt/AbstractHdfsBolt.java       |  25 +-
 external/storm-kafka-client/pom.xml             |  26 +
 .../apache/storm/kafka/spout/KafkaSpout.java    |  31 +-
 .../storm/kafka/spout/KafkaSpoutStream.java     |   2 +-
 .../storm/kafka/spout/KafkaSpoutStreams.java    |   3 +
 .../spout/KafkaSpoutStreamsNamedTopics.java     |  11 +
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |   6 +
 .../trident/KafkaTridentSpoutBatchMetadata.java |  83 ++
 .../spout/trident/KafkaTridentSpoutEmitter.java | 197 +++++
 .../spout/trident/KafkaTridentSpoutManager.java | 119 +++
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  80 ++
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  64 ++
 .../KafkaTridentSpoutTopicPartition.java        |  68 ++
 ...KafkaTridentSpoutTopicPartitionRegistry.java |  48 ++
 .../trident/KafkaTridentSpoutTransactional.java |  48 ++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |   4 +-
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  | 240 ++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  30 +-
 .../builders/TopicKeyValueTupleBuilder.java     |   2 +-
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |   5 +-
 pom.xml                                         |   1 +
 .../org/apache/storm/daemon/local_executor.clj  |  42 -
 .../src/clj/org/apache/storm/daemon/worker.clj  | 818 -------------------
 storm-core/src/clj/org/apache/storm/testing.clj |  28 +-
 .../src/jvm/org/apache/storm/Constants.java     |   6 +-
 .../src/jvm/org/apache/storm/daemon/Task.java   |  46 +-
 .../storm/daemon/supervisor/AdvancedFSOps.java  |   9 +-
 .../storm/daemon/supervisor/BasicContainer.java |   2 +-
 .../storm/daemon/supervisor/LocalContainer.java |  11 +-
 .../daemon/supervisor/ReadClusterState.java     |   2 +-
 .../storm/daemon/worker/LogConfigManager.java   | 156 ++++
 .../org/apache/storm/daemon/worker/Worker.java  | 454 ++++++++++
 .../apache/storm/daemon/worker/WorkerState.java | 691 ++++++++++++++++
 .../jvm/org/apache/storm/executor/Executor.java |  55 +-
 .../apache/storm/executor/ExecutorShutdown.java |   5 +-
 .../apache/storm/executor/ExecutorTransfer.java |  15 +-
 .../apache/storm/executor/IRunningExecutor.java |   2 +-
 .../apache/storm/executor/LocalExecutor.java    |  56 ++
 .../storm/executor/bolt/BoltExecutor.java       |  10 +-
 .../storm/executor/spout/SpoutExecutor.java     |   3 +-
 .../DeserializingConnectionCallback.java        |  10 +-
 .../storm/scheduler/resource/RAS_Node.java      |   4 +-
 .../org/apache/storm/security/auth/AutoSSL.java | 161 ++++
 .../storm/security/auth/ThriftClient.java       |  13 +-
 .../org/apache/storm/task/TopologyContext.java  |   3 +
 .../org/apache/storm/utils/TransferDrainer.java |  26 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   | 116 ++-
 .../apache/storm/messaging/netty_unit_test.clj  |  13 +-
 .../test/clj/org/apache/storm/nimbus_test.clj   |   6 +-
 .../test/clj/org/apache/storm/worker_test.clj   | 205 -----
 .../daemon/supervisor/BasicContainerTest.java   |   2 +-
 .../daemon/worker/LogConfigManagerTest.java     | 219 +++++
 .../apache/storm/daemon/worker/WorkerTest.java  |  39 +
 .../apache/storm/security/auth/AutoSSLTest.java | 136 +++
 storm-dist/binary/src/main/assembly/binary.xml  |   7 +
 99 files changed, 5299 insertions(+), 1797 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 34b9fdc,2d9a844..2b56336
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -70,8 -77,21 +84,20 @@@
          <dependency>
              <groupId>junit</groupId>
              <artifactId>junit</artifactId>
 -            <version>4.11</version>
              <scope>test</scope>
          </dependency>
+         <dependency>
+             <groupId>info.batey.kafka</groupId>
+             <artifactId>kafka-unit</artifactId>
+             <version>0.6</version>
+             <scope>test</scope>
+         </dependency>
+         <dependency>
+             <groupId>org.slf4j</groupId>
+             <artifactId>log4j-over-slf4j</artifactId>
+             <version>${log4j-over-slf4j.version}</version>
+             <scope>test</scope>
+         </dependency>
      </dependencies>
  
      <build>

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 5534082,439492b..ce01a76
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@@ -299,19 -289,18 +302,21 @@@ public class KafkaSpout<K, V> extends B
              LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
          } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
              LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
 -        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
 -            final List<Object> tuple = tuplesBuilder.buildTuple(record);
 -            kafkaSpoutStreams.emit(collector, tuple, msgId);
 -            emitted.add(msgId);
 -            numUncommittedOffsets++;
 -            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
 -                retryService.remove(msgId);  // re-emitted hence remove from failed
 +        } else {
 +            boolean isScheduled = retryService.isScheduled(msgId);
 +            if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
 +                final List<Object> tuple = tuplesBuilder.buildTuple(record);
 +                kafkaSpoutStreams.emit(collector, tuple, msgId);
 +                emitted.add(msgId);
 +                numUncommittedOffsets++;
 +                if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
 +                    retryService.remove(msgId);
 +                }
 +                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
++               return true;
              }
 -            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
 -            return true;
          }
+         return false;
      }
  
      private void commitOffsetsForAckedTuples() {

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index b5428f9,0000000..3e077ab
mode 100644,000000..100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@@ -1,166 -1,0 +1,166 @@@
 +/*
 + * Copyright 2016 The Apache Software Foundation.
 + *
 + * Licensed under the Apache License, Version 2.0 (the "License");
 + * you may not use this file except in compliance with the License.
 + * You may obtain a copy of the License at
 + *
 + *      http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +package org.apache.storm.kafka.spout;
 +
 +import java.util.ArrayList;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 +import org.apache.kafka.clients.consumer.ConsumerRecord;
 +import org.apache.kafka.clients.consumer.ConsumerRecords;
 +import org.apache.kafka.clients.consumer.KafkaConsumer;
 +import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 +import org.apache.kafka.common.TopicPartition;
 +import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
 +import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 +import org.apache.storm.spout.SpoutOutputCollector;
 +import org.apache.storm.task.TopologyContext;
 +import org.junit.Test;
 +import org.mockito.ArgumentCaptor;
 +import static org.mockito.Matchers.anyCollection;
 +import static org.mockito.Matchers.anyLong;
 +import static org.mockito.Matchers.anyObject;
 +import static org.mockito.Mockito.when;
 +import org.junit.Before;
 +import org.mockito.Captor;
 +import static org.mockito.Mockito.reset;
 +import org.mockito.MockitoAnnotations;
 +import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
 +import static org.hamcrest.CoreMatchers.not;
 +import static org.hamcrest.Matchers.hasKey;
 +import static org.junit.Assert.assertThat;
 +import static org.mockito.Mockito.mock;
 +import static org.mockito.Mockito.never;
 +import static org.mockito.Mockito.verify;
 +
 +public class KafkaSpoutRebalanceTest {
 +
 +    @Captor
 +    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 +
 +    private TopologyContext contextMock;
 +    private SpoutOutputCollector collectorMock;
 +    private Map conf;
 +    private KafkaConsumer<String, String> consumerMock;
 +    private KafkaConsumerFactory<String, String> consumerFactoryMock;
 +
 +    @Before
 +    public void setUp() {
 +        MockitoAnnotations.initMocks(this);
 +        contextMock = mock(TopologyContext.class);
 +        collectorMock = mock(SpoutOutputCollector.class);
 +        conf = new HashMap<>();
 +        consumerMock = mock(KafkaConsumer.class);
 +        consumerFactoryMock = (kafkaSpoutConfig) -> consumerMock;
 +    }
 +
 +    //Returns messageIds in order of emission
 +    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
 +        //Setup spout with mock consumer so we can get at the rebalance listener
 +        spout.open(conf, contextMock, collectorMock);
 +        spout.activate();
 +
 +        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
 +        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
 +
 +        //Assign partitions to the spout
 +        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
 +        List<TopicPartition> assignedPartitions = new ArrayList<>();
 +        assignedPartitions.add(partitionThatWillBeRevoked);
 +        assignedPartitions.add(assignedPartition);
 +        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
 +
 +        //Make the consumer return a single message for each partition
 +        Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPartitionRecords = new HashMap<>();
 +        firstPartitionRecords.put(partitionThatWillBeRevoked, Collections.singletonList(new ConsumerRecord(partitionThatWillBeRevoked.topic(), partitionThatWillBeRevoked.partition(), 0L, "key", "value")));
 +        Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
 +        secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
 +        when(consumerMock.poll(anyLong()))
 +                .thenReturn(new ConsumerRecords(firstPartitionRecords))
 +                .thenReturn(new ConsumerRecords(secondPartitionRecords))
 +                .thenReturn(new ConsumerRecords(Collections.emptyMap()));
 +
 +        //Emit the messages
 +        spout.nextTuple();
 +        ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
 +        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForRevokedPartition.capture());
 +        reset(collectorMock);
 +        spout.nextTuple();
 +        ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
 +        verify(collectorMock).emit(anyObject(), anyObject(), messageIdForAssignedPartition.capture());
 +
 +        //Now rebalance
 +        consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
 +        consumerRebalanceListener.onPartitionsAssigned(Collections.singleton(assignedPartition));
 +        
 +        List<KafkaSpoutMessageId> emittedMessageIds = new ArrayList<>();
 +        emittedMessageIds.add(messageIdForRevokedPartition.getValue());
 +        emittedMessageIds.add(messageIdForAssignedPartition.getValue());
 +        return emittedMessageIds;
 +    }
 +
 +    @Test
 +    public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
 +        //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
 +        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
-         String topic = SingleTopicKafkaSpoutConfiguration.topic;
++        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
 +        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
 +        TopicPartition assignedPartition = new TopicPartition(topic, 2);
 +        
 +        //Emit a message on each partition and revoke the first partition
 +        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 +        
 +        //Ack both emitted tuples
 +        spout.ack(emittedMessageIds.get(0));
 +        spout.ack(emittedMessageIds.get(1));
 +
 +        //Ensure the commit timer has expired
 +        Thread.sleep(510);
 +
 +        //Make the spout commit any acked tuples
 +        spout.nextTuple();
 +        //Verify that it only committed the message on the assigned partition
 +        verify(consumerMock).commitSync(commitCapture.capture());
 +
 +        Map<TopicPartition, OffsetAndMetadata> commitCaptureMap = commitCapture.getValue();
 +        assertThat(commitCaptureMap, hasKey(assignedPartition));
 +        assertThat(commitCaptureMap, not(hasKey(partitionThatWillBeRevoked)));
 +    }
 +    
 +    @Test
 +    public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
 +        //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
 +        KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
 +        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
-         String topic = SingleTopicKafkaSpoutConfiguration.topic;
++        String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
 +        TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
 +        TopicPartition assignedPartition = new TopicPartition(topic, 2);
 +        
 +        //Emit a message on each partition and revoke the first partition
 +        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
 +        
 +        //Fail both emitted tuples
 +        spout.fail(emittedMessageIds.get(0));
 +        spout.fail(emittedMessageIds.get(1));
 +        
 +        //Check that only the tuple on the currently assigned partition is retried
 +        verify(retryServiceMock, never()).schedule(emittedMessageIds.get(0));
 +        verify(retryServiceMock).schedule(emittedMessageIds.get(1));
 +    }
 +}

http://git-wip-us.apache.org/repos/asf/storm/blob/bd1f5c54/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 157543c,baece93..6921f7c
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@@ -46,17 -46,9 +46,17 @@@ public class SingleTopicKafkaSpoutConfi
          return tp.createTopology();
      }
  
-     static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 -    public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 -        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
 -                .setOffsetCommitPeriodMs(10_000)
++    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
 +        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
 +    }
-     
-     static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
++
++    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
 +        return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
 +    }
 +    
 +    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
 +        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
 +                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                  .setFirstPollOffsetStrategy(EARLIEST)
                  .setMaxUncommittedOffsets(250)
                  .setPollTimeoutMs(1000)


[5/5] storm git commit: Add STORM-2104 to Changelog

Posted by bo...@apache.org.
Add STORM-2104 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/806e3635
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/806e3635
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/806e3635

Branch: refs/heads/master
Commit: 806e3635c301bf02d4525f1acb837110a87c4e26
Parents: 23899bd
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Dec 8 12:14:07 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Dec 8 12:14:07 2016 -0600

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/806e3635/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 49dfcef..5f73dc7 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2104: More graceful handling of acked/failed tuples after partition reassignment
  * STORM-1281: LocalCluster, testing4j and testing.clj to java
  * STORM-2226: Fix kafka spout offset lag ui for kerberized kafka
  * STORM-1276: line for line translation of nimbus to java


[4/5] storm git commit: Merge branch 'STORM-2104' of https://github.com/srdo/storm into STORM-2104

Posted by bo...@apache.org.
Merge branch 'STORM-2104' of https://github.com/srdo/storm into STORM-2104

STORM-2104: More graceful handling of acked/failed tuples after partition reassignment


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/23899bd8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/23899bd8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/23899bd8

Branch: refs/heads/master
Commit: 23899bd8484b0d3f035436ad626a08a4581a00ee
Parents: e9cc748 29b52e9
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Dec 8 12:02:18 2016 -0600
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Dec 8 12:02:18 2016 -0600

----------------------------------------------------------------------
 external/storm-kafka-client/pom.xml             |   9 +-
 .../apache/storm/kafka/spout/KafkaSpout.java    |  48 ++++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |   8 +-
 .../kafka/spout/SerializableDeserializer.java   |  25 +++
 .../spout/internal/KafkaConsumerFactory.java    |  27 +++
 .../internal/KafkaConsumerFactoryDefault.java   |  29 ++++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 166 +++++++++++++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  14 +-
 8 files changed, 306 insertions(+), 20 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/23899bd8/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------


[3/5] storm git commit: Merge branch 'master' of https://github.com/apache/storm into STORM-2104

Posted by bo...@apache.org.
Merge branch 'master' of https://github.com/apache/storm into STORM-2104


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/29b52e9c
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/29b52e9c
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/29b52e9c

Branch: refs/heads/master
Commit: 29b52e9c266b130642d99e665ff73c55dfabba13
Parents: bd1f5c5 57998b4
Author: Stig Rohde D�ssing <sd...@it-minds.dk>
Authored: Tue Nov 29 20:54:47 2016 +0100
Committer: Stig Rohde D�ssing <sd...@it-minds.dk>
Committed: Tue Nov 29 20:54:47 2016 +0100

----------------------------------------------------------------------
 CHANGELOG.md                                    |   9 +
 docs/index.md                                   |  12 +-
 docs/storm-cassandra.md                         |  23 --
 docs/storm-druid.md                             | 119 +++++++++
 docs/storm-kafka-client.md                      | 188 ++++++++++++++
 docs/storm-kinesis.md                           | 136 ++++++++++
 docs/storm-mongodb.md                           |  23 --
 docs/storm-opentsdb.md                          |  52 ++++
 docs/storm-redis.md                             |  24 --
 docs/storm-sql-reference.md                     | 111 +++++++-
 examples/storm-starter/pom.xml                  |   1 +
 external/flux/pom.xml                           |   6 -
 external/storm-cassandra/pom.xml                |   7 -
 external/storm-eventhubs/pom.xml                |   6 -
 external/storm-hdfs/pom.xml                     |   6 -
 .../apache/storm/hdfs/trident/HdfsState.java    |  31 +--
 external/storm-hive/pom.xml                     |   6 -
 external/storm-jdbc/pom.xml                     |   6 -
 external/storm-jms/examples/pom.xml             |   1 -
 external/storm-jms/pom.xml                      |  12 -
 external/storm-kafka-client/pom.xml             |   5 -
 .../storm/kafka/monitor/KafkaOffsetLagUtil.java |   6 +-
 external/storm-kafka/pom.xml                    |   6 -
 .../storm/kafka/DynamicBrokersReader.java       |   8 +-
 .../src/jvm/org/apache/storm/kafka/ZkState.java |   2 +-
 .../storm/kinesis/spout/ZKConnection.java       |   2 +-
 external/storm-metrics/pom.xml                  |   1 -
 external/storm-solr/pom.xml                     |   6 -
 .../maven-shade-clojure-transformer/pom.xml     |   1 -
 storm-buildtools/storm-maven-plugins/pom.xml    |   2 +-
 storm-core/pom.xml                              |   2 +-
 .../storm/cluster/StormClusterStateImpl.java    |   8 +-
 .../jvm/org/apache/storm/daemon/DrpcServer.java |   2 +-
 .../org/apache/storm/daemon/StormCommon.java    |  16 +-
 .../org/apache/storm/drpc/ReturnResults.java    |  14 +-
 .../jvm/org/apache/storm/executor/Executor.java |   7 +-
 .../apache/storm/grouping/ShuffleGrouping.java  |  12 +-
 .../apache/storm/multilang/JsonSerializer.java  |  11 +-
 .../jvm/org/apache/storm/spout/ShellSpout.java  |   1 +
 .../storm/task/GeneralTopologyContext.java      |  11 +-
 .../apache/storm/topology/TopologyBuilder.java  |  12 +-
 .../trident/drpc/ReturnResultsReducer.java      |  11 +-
 .../state/JSONNonTransactionalSerializer.java   |   5 +-
 .../trident/state/JSONOpaqueSerializer.java     |   6 +-
 .../state/JSONTransactionalSerializer.java      |   6 +-
 .../storm/trident/testing/TuplifyArgs.java      |  14 +-
 .../topology/state/TransactionalState.java      |   2 +-
 .../apache/storm/ui/FilterConfiguration.java    |   2 +-
 .../apache/storm/utils/TopologySpoutLag.java    |   7 +-
 .../src/jvm/org/apache/storm/utils/Utils.java   |   4 +-
 .../src/native/worker-launcher/impl/main.c      |   3 +-
 .../worker-launcher/impl/worker-launcher.c      | 260 ++++++-------------
 .../worker-launcher/impl/worker-launcher.h      |  16 +-
 .../storm/grouping/ShuffleGroupingTest.java     | 147 +++++++++++
 storm-dist/binary/pom.xml                       |   2 +-
 storm-dist/source/pom.xml                       |   2 +-
 storm-multilang/javascript/pom.xml              |   2 +-
 storm-multilang/python/pom.xml                  |   2 +-
 storm-multilang/ruby/pom.xml                    |   2 +-
 storm-rename-hack/pom.xml                       |   3 +-
 60 files changed, 982 insertions(+), 428 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/29b52e9c/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka-client/pom.xml
index 2b56336,a7971e2..97ed359
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@@ -71,19 -71,7 +71,14 @@@
          <!--test dependencies -->
          <dependency>
              <groupId>org.mockito</groupId>
 -            <artifactId>mockito-all</artifactId>
 +            <artifactId>mockito-core</artifactId>
 +            <scope>test</scope>
 +            <version>${mockito.version}</version>
 +        </dependency>
 +        <dependency>
 +            <groupId>org.hamcrest</groupId>
 +            <artifactId>hamcrest-all</artifactId>
 +            <version>1.3</version>
-             <scope>test</scope>
-         </dependency>
-         <dependency>
-             <groupId>junit</groupId>
-             <artifactId>junit</artifactId>
              <scope>test</scope>
          </dependency>
          <dependency>