You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by sr...@apache.org on 2017/07/19 18:25:04 UTC

[1/4] storm git commit: STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option

Repository: storm
Updated Branches:
  refs/heads/master 10d381b30 -> 3580dbc80


http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 2d55520..23630a6 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -19,9 +19,10 @@ import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfigu
 import static org.hamcrest.CoreMatchers.not;
 import static org.hamcrest.Matchers.hasKey;
 import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.doNothing;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.reset;
@@ -42,6 +43,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -74,14 +76,11 @@ public class KafkaSpoutRebalanceTest {
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition) {
-        //Setup spout with mock consumer so we can get at the rebalance listener
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+        //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
         List<TopicPartition> assignedPartitions = new ArrayList<>();
@@ -95,9 +94,9 @@ public class KafkaSpoutRebalanceTest {
         Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPartitionRecords = new HashMap<>();
         secondPartitionRecords.put(assignedPartition, Collections.singletonList(new ConsumerRecord(assignedPartition.topic(), assignedPartition.partition(), 0L, "key", "value")));
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(firstPartitionRecords))
-            .thenReturn(new ConsumerRecords(secondPartitionRecords))
-            .thenReturn(new ConsumerRecords(Collections.emptyMap()));
+            .thenReturn(new ConsumerRecords<>(firstPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(secondPartitionRecords))
+            .thenReturn(new ConsumerRecords<>(Collections.emptyMap()));
 
         //Emit the messages
         spout.nextTuple();
@@ -122,7 +121,12 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(), rebalanceListenerCapture.capture(), any());
+            KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
                 .build(), consumerFactory);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
@@ -130,7 +134,8 @@ public class KafkaSpoutRebalanceTest {
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
-            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+            List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+                spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -152,8 +157,13 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+            Subscription subscriptionMock = mock(Subscription.class);
+            doNothing()
+                .when(subscriptionMock)
+                .subscribe(any(), rebalanceListenerCapture.capture(), any());
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(-1)
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfigBuilder(subscriptionMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
             .build(), consumerFactory);
@@ -166,7 +176,8 @@ public class KafkaSpoutRebalanceTest {
             .thenReturn(new KafkaSpoutMessageId(assignedPartition, 0));
         
         //Emit a message on each partition and revoke the first partition
-        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(spout, partitionThatWillBeRevoked, assignedPartition);
+        List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
+            spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, times(2)).getMessageId(anyObject());

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index d84f4da..078f7a1 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -30,80 +30,71 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
 import org.mockito.InOrder;
+import org.mockito.MockitoAnnotations;
 
 public class KafkaSpoutRetryLimitTest {
-
+    
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
-
+    private KafkaSpoutConfig<String, String> spoutConfig;
+    
     public static final KafkaSpoutRetryService ZERO_RETRIES_RETRY_SERVICE =
-            new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
-                    0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    private void setupSpoutWithNoRetry(Set<TopicPartition> assignedPartitions) {
+        new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            0, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+    
+    @Captor
+    private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
+    
+    @Before
+    public void setUp() {
+        MockitoAnnotations.initMocks(this);
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .setRetry(ZERO_RETRIES_RETRY_SERVICE)
-                .build();
-
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .setRetry(ZERO_RETRIES_RETRY_SERVICE)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
-
+    
     @Test
     public void testFailingTupleCompletesAckAfterRetryLimitIsMet() {
         //Spout should ack failed messages after they hit the retry limit
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpoutWithNoRetry(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
             int lastOffset = 3;
             for (int i = 0; i <= lastOffset; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
-
+            
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
-
+                .thenReturn(new ConsumerRecords<>(records));
+            
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();
             }
-
+            
             ArgumentCaptor<KafkaSpoutMessageId> messageIds = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
             verify(collectorMock, times(recordsForPartition.size())).emit(anyObject(), anyObject(), messageIds.capture());
-
+            
             for (KafkaSpoutMessageId messageId : messageIds.getAllValues()) {
                 spout.fail(messageId);
             }
@@ -111,16 +102,15 @@ public class KafkaSpoutRetryLimitTest {
             // Advance time and then trigger call to kafka consumer commit
             Time.advanceTime(KafkaSpout.TIMER_DELAY_MS + offsetCommitPeriodMs);
             spout.nextTuple();
-
-            ArgumentCaptor<Map> committedOffsets=ArgumentCaptor.forClass(Map.class);
+            
             InOrder inOrder = inOrder(consumerMock);
-            inOrder.verify(consumerMock).commitSync(committedOffsets.capture());
+            inOrder.verify(consumerMock).commitSync(commitCapture.capture());
             inOrder.verify(consumerMock).poll(anyLong());
 
             //verify that Offset 3 was committed for the given TopicPartition
-            assertTrue(committedOffsets.getValue().containsKey(partition));
-            assertEquals(lastOffset, ((OffsetAndMetadata) (committedOffsets.getValue().get(partition))).offset());
+            assertTrue(commitCapture.getValue().containsKey(partition));
+            assertEquals(lastOffset, ((OffsetAndMetadata) (commitCapture.getValue().get(partition))).offset());
         }
     }
-
-}
\ No newline at end of file
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index 9ebdcf7..261c654 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -22,12 +22,15 @@ import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isIn;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
 import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -58,7 +61,7 @@ public class MaxUncommittedOffsetTest {
     private final int maxUncommittedOffsets = 10;
     private final int maxPollRecords = 5;
     private final int initialRetryDelaySecs = 60;
-    private final KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+    private final KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
         .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
         .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, maxPollRecords)
         .setMaxUncommittedOffsets(maxUncommittedOffsets)
@@ -93,6 +96,8 @@ public class MaxUncommittedOffsetTest {
 
     private void initializeSpout(int msgCount) throws Exception {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
deleted file mode 100644
index e97c7e1..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/NamedTopicFilterTest.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class NamedTopicFilterTest {
-
-    private KafkaConsumer<?, ?> consumerMock;
-    
-    @Before
-    public void setUp() {
-        consumerMock = mock(KafkaConsumer.class);
-    }
-    
-    @Test
-    public void testFilter() {
-        String matchingTopicOne = "test-1";
-        String matchingTopicTwo = "test-11";
-        String unmatchedTopic = "unmatched";
-        
-        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
-        
-        when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
-        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
-        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
-        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
-        when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
-        when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
-        
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
-        
-        assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, 
-            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
-            
-    }
-    
-    private PartitionInfo createPartitionInfo(String topic, int partition) {
-        return new PartitionInfo(topic, partition, null, null, null);
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
deleted file mode 100644
index 877efdc..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/PatternTopicFilterTest.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import static org.hamcrest.Matchers.containsInAnyOrder;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-import org.junit.Before;
-import org.junit.Test;
-
-public class PatternTopicFilterTest {
-
-    private KafkaConsumer<?, ?> consumerMock;
-    
-    @Before
-    public void setUp(){
-        consumerMock = mock(KafkaConsumer.class);
-    }
-    
-    @Test
-    public void testFilter() {
-        Pattern pattern = Pattern.compile("test-\\d+");
-        PatternTopicFilter filter = new PatternTopicFilter(pattern);
-        
-        String matchingTopicOne = "test-1";
-        String matchingTopicTwo = "test-11";
-        String unmatchedTopic = "unmatched";
-        
-        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
-        allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
-        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
-        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
-        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
-        allTopics.put(matchingTopicTwo, testTwoPartitions);
-        allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
-        
-        when(consumerMock.listTopics()).thenReturn(allTopics);
-        
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
-        
-        assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
-            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
-    }
-    
-    private PartitionInfo createPartitionInfo(String topic, int partition) {
-        return new PartitionInfo(topic, partition, null, null, null);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 7f0973b..6b92de8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -20,6 +20,7 @@ package org.apache.storm.kafka.spout;
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.mock;
@@ -28,7 +29,9 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
 
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -77,12 +80,12 @@ public class SingleTopicKafkaSpoutTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        KafkaSpoutConfig spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
+        KafkaSpoutConfig<String, String> spoutConfig = getKafkaSpoutConfigBuilder(kafkaUnitRule.getKafkaUnit().getKafkaPort())
             .setOffsetCommitPeriodMs(commitOffsetPeriodMs)
             .setRetry(new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0),
                 maxRetries, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0)))
             .build();
-        this.consumerSpy = spy(new KafkaConsumerFactoryDefault().createConsumer(spoutConfig));
+        this.consumerSpy = spy(new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig));
         this.consumerFactory = (kafkaSpoutConfig) -> consumerSpy;
         this.spout = new KafkaSpout<>(spoutConfig, consumerFactory);
     }
@@ -100,6 +103,8 @@ public class SingleTopicKafkaSpoutTest {
 
     private void initializeSpout(int msgCount) throws InterruptedException, ExecutionException, TimeoutException {
         populateTopicData(SingleTopicKafkaSpoutConfiguration.TOPIC, msgCount);
+        when(topologyContext.getThisTaskIndex()).thenReturn(0);
+        when(topologyContext.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
         spout.open(conf, topologyContext, collector);
         spout.activate();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
new file mode 100644
index 0000000..5f931bb
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -0,0 +1,74 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+
+public class SpoutWithMockedConsumerSetupHelper {
+    
+    /**
+     * Creates, opens and activates a KafkaSpout using a mocked consumer.
+     * @param <K> The Kafka key type
+     * @param <V> The Kafka value type
+     * @param spoutConfig The spout config to use
+     * @param topoConf The topo conf to pass to the spout
+     * @param contextMock The topo context to pass to the spout
+     * @param collectorMock The mocked collector to pass to the spout
+     * @param consumerMock The mocked consumer
+     * @param assignedPartitions The partitions to assign to this spout. The consumer will act like these partitions are assigned to it.
+     * @return The spout
+     */
+    public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
+        TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, Set<TopicPartition> assignedPartitions) {     
+
+        Map<String, List<PartitionInfo>> partitionInfos = assignedPartitions.stream()
+            .map(tp -> new PartitionInfo(tp.topic(), tp.partition(), null, null, null))
+            .collect(Collectors.groupingBy(info -> info.topic()));
+        partitionInfos.keySet()
+            .forEach(key -> when(consumerMock.partitionsFor(key))
+                .thenReturn(partitionInfos.get(key)));
+        KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
+
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+
+        when(contextMock.getComponentTasks(any())).thenReturn(Collections.singletonList(0));
+        when(contextMock.getThisTaskIndex()).thenReturn(0);
+        
+        spout.open(topoConf, contextMock, collectorMock);
+        spout.activate();
+
+        verify(consumerMock).assign(assignedPartitions);
+        
+        return spout;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 62dbfe5..d2f38b0 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,6 +17,7 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.DEFAULT_MAX_RETRIES;
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
 
 import org.apache.kafka.clients.consumer.ConsumerConfig;
@@ -24,16 +25,26 @@ import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
+import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
 public class SingleTopicKafkaSpoutConfiguration {
+
     public static final String STREAM = "test_stream";
     public static final String TOPIC = "test";
 
+    /**
+     * Retry in a tight loop (keep unit tests fasts).
+     */
+    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE =
+        new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(0), KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
+
     public static Config getConfig() {
         Config config = new Config();
         config.setDebug(true);
@@ -47,20 +58,27 @@ public class SingleTopicKafkaSpoutConfiguration {
         return tp.createTopology();
     }
 
-    public static KafkaSpoutConfig.Builder<String,String> getKafkaSpoutConfigBuilder(int port) {
-        return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
-                .setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
-                        new Fields("topic", "key", "value"), STREAM)
-                .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
-                .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
-                .setRetry(getRetryService())
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .setPollTimeoutMs(1000);
+    public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(int port) {
+        return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
+    }
+
+    public static KafkaSpoutConfig.Builder<String, String> getKafkaSpoutConfigBuilder(Subscription subscription, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
     }
-        
+
+    private static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {
+        return config.setRecordTranslator((r) -> new Values(r.topic(), r.key(), r.value()),
+            new Fields("topic", "key", "value"), STREAM)
+            .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup")
+            .setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 5)
+            .setRetry(getRetryService())
+            .setOffsetCommitPeriodMs(10_000)
+            .setFirstPollOffsetStrategy(EARLIEST)
+            .setMaxUncommittedOffsets(250)
+            .setPollTimeoutMs(1000);
+    }
+
     protected static KafkaSpoutRetryService getRetryService() {
-        return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
+        return UNIT_TEST_RETRY_SERVICE;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
new file mode 100644
index 0000000..3985619
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class NamedTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp() {
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        NamedTopicFilter filter = new NamedTopicFilter(matchingTopicOne, matchingTopicTwo);
+        
+        when(consumerMock.partitionsFor(matchingTopicOne)).thenReturn(Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> partitionTwoPartitions = new ArrayList<>();
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        partitionTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
+        when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, 
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+            
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
new file mode 100644
index 0000000..67411e3
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.junit.Before;
+import org.junit.Test;
+
+public class PatternTopicFilterTest {
+
+    private KafkaConsumer<?, ?> consumerMock;
+    
+    @Before
+    public void setUp(){
+        consumerMock = mock(KafkaConsumer.class);
+    }
+    
+    @Test
+    public void testFilter() {
+        Pattern pattern = Pattern.compile("test-\\d+");
+        PatternTopicFilter filter = new PatternTopicFilter(pattern);
+        
+        String matchingTopicOne = "test-1";
+        String matchingTopicTwo = "test-11";
+        String unmatchedTopic = "unmatched";
+        
+        Map<String, List<PartitionInfo>> allTopics = new HashMap<>();
+        allTopics.put(matchingTopicOne, Collections.singletonList(createPartitionInfo(matchingTopicOne, 0)));
+        List<PartitionInfo> testTwoPartitions = new ArrayList<>();
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 0));
+        testTwoPartitions.add(createPartitionInfo(matchingTopicTwo, 1));
+        allTopics.put(matchingTopicTwo, testTwoPartitions);
+        allTopics.put(unmatchedTopic, Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
+        
+        when(consumerMock.listTopics()).thenReturn(allTopics);
+        
+        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        
+        assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
+            containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));
+    }
+    
+    private PartitionInfo createPartitionInfo(String topic, int partition) {
+        return new PartitionInfo(topic, partition, null, null, null);
+    }
+}


[4/4] storm git commit: Changelog: STORM-2542

Posted by sr...@apache.org.
Changelog: STORM-2542


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/3580dbc8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/3580dbc8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/3580dbc8

Branch: refs/heads/master
Commit: 3580dbc806307fc1bda805f04d910657a6fa61d5
Parents: f67699c
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jul 19 20:18:11 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 20:18:11 2017 +0200

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/3580dbc8/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index cda77f1..625e7b5 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option, make KafkaConsumer.assign the default
  * STORM-2133: add page-rendered-at timestamp on the UI
  * STORM-2541: Fix storm-kafka-client manual subscription not being able to start consuming
  * STORM-2622: Add owner resource summary on storm UI


[3/4] storm git commit: Merge branch 'STORM-2542' of https://github.com/srdo/storm into STORM-2542-merge

Posted by sr...@apache.org.
Merge branch 'STORM-2542' of https://github.com/srdo/storm into STORM-2542-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/f67699cc
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/f67699cc
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/f67699cc

Branch: refs/heads/master
Commit: f67699cce3d71c416ee4db3e8692c289cae0f4db
Parents: 10d381b fdb649e
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Wed Jul 19 20:15:20 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 20:15:20 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |   7 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 175 ++++++++++---------
 .../spout/ManualPartitionSubscription.java      |  71 --------
 .../storm/kafka/spout/ManualPartitioner.java    |  40 -----
 .../storm/kafka/spout/NamedSubscription.java    |  61 -------
 .../storm/kafka/spout/NamedTopicFilter.java     |  67 -------
 .../storm/kafka/spout/PatternSubscription.java  |  54 ------
 .../storm/kafka/spout/PatternTopicFilter.java   |  69 --------
 .../spout/RoundRobinManualPartitioner.java      |  50 ------
 .../apache/storm/kafka/spout/Subscription.java  |  53 ------
 .../apache/storm/kafka/spout/TopicFilter.java   |  38 ----
 .../ManualPartitionSubscription.java            |  72 ++++++++
 .../spout/subscription/ManualPartitioner.java   |  40 +++++
 .../spout/subscription/NamedTopicFilter.java    |  67 +++++++
 .../spout/subscription/PatternTopicFilter.java  |  69 ++++++++
 .../RoundRobinManualPartitioner.java            |  50 ++++++
 .../kafka/spout/subscription/Subscription.java  |  53 ++++++
 .../kafka/spout/subscription/TopicFilter.java   |  38 ++++
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  36 ++--
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |  48 ++---
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  37 ++--
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  74 ++++----
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   7 +-
 .../storm/kafka/spout/NamedTopicFilterTest.java |  69 --------
 .../kafka/spout/PatternTopicFilterTest.java     |  73 --------
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   9 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |  74 ++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  44 +++--
 .../subscription/NamedTopicFilterTest.java      |  68 +++++++
 .../subscription/PatternTopicFilterTest.java    |  73 ++++++++
 30 files changed, 827 insertions(+), 859 deletions(-)
----------------------------------------------------------------------



[2/4] storm git commit: STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option

Posted by sr...@apache.org.
STORM-2542: Remove storm-kafka-client KafkaConsumer.subscribe API option


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/fdb649e3
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/fdb649e3
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/fdb649e3

Branch: refs/heads/master
Commit: fdb649e352e05fd849cafa312bbd62fc75694579
Parents: cd6ca3e
Author: Stig Rohde Døssing <st...@gmail.com>
Authored: Mon Jun 5 14:59:19 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Wed Jul 19 00:18:03 2017 +0200

----------------------------------------------------------------------
 docs/storm-kafka-client.md                      |   7 +-
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 175 ++++++++++---------
 .../spout/ManualPartitionSubscription.java      |  71 --------
 .../storm/kafka/spout/ManualPartitioner.java    |  40 -----
 .../storm/kafka/spout/NamedSubscription.java    |  61 -------
 .../storm/kafka/spout/NamedTopicFilter.java     |  67 -------
 .../storm/kafka/spout/PatternSubscription.java  |  54 ------
 .../storm/kafka/spout/PatternTopicFilter.java   |  69 --------
 .../spout/RoundRobinManualPartitioner.java      |  50 ------
 .../apache/storm/kafka/spout/Subscription.java  |  53 ------
 .../apache/storm/kafka/spout/TopicFilter.java   |  38 ----
 .../ManualPartitionSubscription.java            |  72 ++++++++
 .../spout/subscription/ManualPartitioner.java   |  40 +++++
 .../spout/subscription/NamedTopicFilter.java    |  67 +++++++
 .../spout/subscription/PatternTopicFilter.java  |  69 ++++++++
 .../RoundRobinManualPartitioner.java            |  50 ++++++
 .../kafka/spout/subscription/Subscription.java  |  53 ++++++
 .../kafka/spout/subscription/TopicFilter.java   |  38 ++++
 .../storm/kafka/spout/KafkaSpoutCommitTest.java |  36 ++--
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |  48 ++---
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  37 ++--
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  74 ++++----
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   7 +-
 .../storm/kafka/spout/NamedTopicFilterTest.java |  69 --------
 .../kafka/spout/PatternTopicFilterTest.java     |  73 --------
 .../kafka/spout/SingleTopicKafkaSpoutTest.java  |   9 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |  74 ++++++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  44 +++--
 .../subscription/NamedTopicFilterTest.java      |  68 +++++++
 .../subscription/PatternTopicFilterTest.java    |  73 ++++++++
 30 files changed, 827 insertions(+), 859 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/docs/storm-kafka-client.md
----------------------------------------------------------------------
diff --git a/docs/storm-kafka-client.md b/docs/storm-kafka-client.md
index ada8619..99b9ae5 100644
--- a/docs/storm-kafka-client.md
+++ b/docs/storm-kafka-client.md
@@ -240,12 +240,9 @@ streams.  If you are doing this for Trident a value must be in the List returned
 otherwise trident can throw exceptions.
 
 
-### Manual Partition Control (ADVANCED)
+### Manual Partition Assigment (ADVANCED)
 
-By default Kafka will automatically assign partitions to the current set of spouts.  It handles lots of things, but in some cases you may want to manually assign the partitions.
-This can cause less churn in the assignments when spouts go down and come back up, but it can result in a lot of issues if not done right.  This can all be handled by subclassing
-Subscription and we have a few implementations that you can look at for examples on how to do this.  ManualPartitionNamedSubscription and ManualPartitionPatternSubscription.  Again
-please be careful when using these or implementing your own.
+By default the KafkaSpout instances will be assigned partitions using a round robin strategy. If you need to customize partition assignment, you must implement the `ManualPartitioner` interface. The implementation can be passed to the `ManualPartitionSubscription` constructor, and the `Subscription` can then be set in the `KafkaSpoutConfig` via the `KafkaSpoutConfig.Builder` constructor. Please take care when supplying a custom implementation, since an incorrect `ManualPartitioner` implementation could leave some partitions unread, or concurrently read by multiple spout instances. See the `RoundRobinManualPartitioner` for an example of how to implement this functionality.
 
 ## Use the Maven Shade Plugin to Build the Uber Jar
 

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 6f09f5f..72fa52e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -24,39 +24,41 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Properties;
+import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
+import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
+import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.tuple.Fields;
 
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics.
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
+
     private static final long serialVersionUID = 141902646130682494L;
     // 200ms
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
     // 30s
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
     // Retry forever
-    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     
+    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
     // 10,000,000 records => 80MBs of memory footprint in the worst case
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
     // 2s
-    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; 
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
     public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =  
-            new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-                    DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
-    /**
-     * Retry in a tight loop (keep unit tests fasts) do not use in production.
-     */
-    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = 
-        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
-            DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
-    
+    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
     private final Subscription subscription;
@@ -73,9 +75,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * Creates a new KafkaSpoutConfig using a Builder.
+     *
      * @param builder The Builder to construct the KafkaSpoutConfig from
      */
-    public KafkaSpoutConfig(Builder<K,V> builder) {
+    public KafkaSpoutConfig(Builder<K, V> builder) {
         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
         this.subscription = builder.subscription;
         this.translator = builder.translator;
@@ -108,12 +111,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         EARLIEST,
         LATEST,
         UNCOMMITTED_EARLIEST,
-        UNCOMMITTED_LATEST 
+        UNCOMMITTED_LATEST
     }
-    
-    public static class Builder<K,V> {
+
+    public static class Builder<K, V> {
+
         private final Map<String, Object> kafkaProps;
-        private Subscription subscription;
+        private final Subscription subscription;
         private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
@@ -123,20 +127,22 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
         private boolean emitNullTuples = false;
 
-        public Builder(String bootstrapServers, String ... topics) {
-            this(bootstrapServers, new NamedSubscription(topics));
+        public Builder(String bootstrapServers, String... topics) {
+            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
         }
-        
-        public Builder(String bootstrapServers, Collection<String> topics) {
-            this(bootstrapServers, new NamedSubscription(topics));
+
+        public Builder(String bootstrapServers, Set<String> topics) {
+            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+                new NamedTopicFilter(topics)));
         }
-        
+
         public Builder(String bootstrapServers, Pattern topics) {
-            this(bootstrapServers, new PatternSubscription(topics));
+            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
         }
-        
+
         /**
          * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+         *
          * @param bootstrapServers The bootstrap servers the consumer will use
          * @param subscription The subscription defining which topics and partitions each spout instance will read.
          */
@@ -149,30 +155,30 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.subscription = subscription;
             this.translator = new DefaultRecordTranslator<>();
         }
-        
+
         /**
-         * Set a {@link KafkaConsumer} property. 
+         * Set a {@link KafkaConsumer} property.
          */
-        public Builder<K,V> setProp(String key, Object value) {
+        public Builder<K, V> setProp(String key, Object value) {
             kafkaProps.put(key, value);
             return this;
         }
-        
+
         /**
          * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setProp(Map<String, Object> props) {
+        public Builder<K, V> setProp(Map<String, Object> props) {
             kafkaProps.putAll(props);
             return this;
         }
-        
+
         /**
          * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setProp(Properties props) {
+        public Builder<K, V> setProp(Properties props) {
             props.forEach((key, value) -> {
                 if (key instanceof String) {
-                    kafkaProps.put((String)key, value);
+                    kafkaProps.put((String) key, value);
                 } else {
                     throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
                 }
@@ -183,46 +189,51 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         //Spout Settings
         /**
          * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+         *
          * @param pollTimeoutMs time in ms
          */
-        public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+        public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
             this.pollTimeoutMs = pollTimeoutMs;
             return this;
         }
 
         /**
          * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+         *
          * @param offsetCommitPeriodMs time in ms
          */
-        public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+        public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
             this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
 
         /**
-         * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
-         * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
-         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
-         * Note that this limit can in some cases be exceeded, but no partition will exceed this limit by more than maxPollRecords - 1.
+         * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place. Once this
+         * limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number of pending offsets
+         * below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}. Note that this limit can in some cases be exceeded,
+         * but no partition will exceed this limit by more than maxPollRecords - 1.
+         *
          * @param maxUncommittedOffsets max number of records that can be be pending commit
          */
-        public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+        public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
             this.maxUncommittedOffsets = maxUncommittedOffsets;
             return this;
         }
 
         /**
-         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
-         * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+         * documentation in {@link FirstPollOffsetStrategy}
+         *
          * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
-         * */
+         */
         public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
             this.firstPollOffsetStrategy = firstPollOffsetStrategy;
             return this;
         }
-        
+
         /**
          * Sets the retry service for the spout to use.
+         *
          * @param retryService the new retry service
          * @return the builder (this).
          */
@@ -238,9 +249,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.translator = translator;
             return this;
         }
-        
+
         /**
          * Configure a translator with tuples to be emitted on the default stream.
+         *
          * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
          * @param fields the names of the fields extracted
          * @return this to be able to chain configuration
@@ -248,9 +260,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
             return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
         }
-        
+
         /**
          * Configure a translator with tuples to be emitted to a given stream.
+         *
          * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
          * @param fields the names of the fields extracted
          * @param stream the stream to emit the tuples on
@@ -259,12 +272,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
             return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
         }
-        
+
         /**
-         * Sets partition refresh period in milliseconds. This is how often kafka will be polled
-         * to check for new topics and/or new partitions.
-         * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+         * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+         * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
          * PatternSubscription rely on kafka to handle this instead.
+         *
          * @param partitionRefreshPeriodMs time in milliseconds
          * @return the builder (this)
          */
@@ -274,8 +287,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly
-         * ack them. By default this parameter is set to false, which means that null tuples are not emitted.
+         * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+         * this parameter is set to false, which means that null tuples are not emitted.
+         *
          * @param emitNullTuples sets if null tuples should or not be emitted downstream
          */
         public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
@@ -283,34 +297,36 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        public KafkaSpoutConfig<K,V> build() {
+        public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
     }
-    
-        
+
     /**
      * Factory method that creates a Builder with String key/value deserializers.
+     *
      * @param bootstrapServers The bootstrap servers for the consumer
      * @param topics The topics to subscribe to
      * @return The new builder
      */
-    public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
+    public static Builder<String, String> builder(String bootstrapServers, String... topics) {
         return setStringDeserializers(new Builder<>(bootstrapServers, topics));
     }
-    
+
     /**
      * Factory method that creates a Builder with String key/value deserializers.
+     *
      * @param bootstrapServers The bootstrap servers for the consumer
      * @param topics The topics to subscribe to
      * @return The new builder
      */
-    public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+    public static Builder<String, String> builder(String bootstrapServers, Set<String> topics) {
         return setStringDeserializers(new Builder<>(bootstrapServers, topics));
     }
-    
+
     /**
      * Factory method that creates a Builder with String key/value deserializers.
+     *
      * @param bootstrapServers The bootstrap servers for the consumer
      * @param topics The topic pattern to subscribe to
      * @return The new builder
@@ -318,13 +334,13 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
         return setStringDeserializers(new Builder<>(bootstrapServers, topics));
     }
-    
+
     private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
         builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
         return builder;
     }
-    
+
     private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
         // set defaults for properties not specified
         if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
@@ -335,17 +351,18 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     /**
      * Gets the properties that will be passed to the KafkaConsumer.
+     *
      * @return The Kafka properties map
      */
     public Map<String, Object> getKafkaProps() {
         return kafkaProps;
     }
-    
+
     public Subscription getSubscription() {
         return subscription;
     }
-    
-    public RecordTranslator<K,V> getTranslator() {
+
+    public RecordTranslator<K, V> getTranslator() {
         return translator;
     }
 
@@ -358,8 +375,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     public boolean isConsumerAutoCommitMode() {
-        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null     // default is false
-                || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+            || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
     public String getConsumerGroupId() {
@@ -377,7 +394,7 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public KafkaSpoutRetryService getRetryService() {
         return retryService;
     }
-    
+
     public long getPartitionRefreshPeriodMs() {
         return partitionRefreshPeriodMs;
     }
@@ -389,14 +406,14 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     @Override
     public String toString() {
         return "KafkaSpoutConfig{"
-                + "kafkaProps=" + kafkaProps
-                + ", pollTimeoutMs=" + pollTimeoutMs
-                + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
-                + ", maxUncommittedOffsets=" + maxUncommittedOffsets
-                + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
-                + ", subscription=" + subscription
-                + ", translator=" + translator
-                + ", retryService=" + retryService
-                + '}';
+            + "kafkaProps=" + kafkaProps
+            + ", pollTimeoutMs=" + pollTimeoutMs
+            + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+            + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+            + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+            + ", subscription=" + subscription
+            + ", translator=" + translator
+            + ", retryService=" + retryService
+            + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
deleted file mode 100644
index 2c65d6d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionSubscription.java
+++ /dev/null
@@ -1,71 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionSubscription extends Subscription {
-    private static final long serialVersionUID = 5633018073527583826L;
-    private final ManualPartitioner partitioner;
-    private final TopicFilter partitionFilter;
-    private Set<TopicPartition> currentAssignment = null;
-    private KafkaConsumer<?, ?> consumer = null;
-    private ConsumerRebalanceListener listener = null;
-    private TopologyContext context = null;
-
-    public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
-        this.partitionFilter = partitionFilter;
-        this.partitioner = parter;
-    }
-    
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
-        this.consumer = consumer;
-        this.listener = listener;
-        this.context = context;
-        refreshAssignment();
-    }
-    
-    @Override
-    public void refreshAssignment() {
-        List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
-        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
-        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
-        if (!newAssignment.equals(currentAssignment)) {
-            consumer.assign(newAssignment);
-            if (currentAssignment != null) {
-                listener.onPartitionsRevoked(currentAssignment);
-            }
-            currentAssignment = newAssignment;
-            listener.onPartitionsAssigned(newAssignment);
-        }
-    }
-    
-    @Override
-    public String getTopicsString() {
-        return partitionFilter.getTopicsString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
deleted file mode 100644
index 4856687..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.List;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A function used to assign partitions to this spout.
- * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
- * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
- * number of spouts to avoid missing partitions or double assigning partitions.
- */
-@FunctionalInterface
-public interface ManualPartitioner {
-    /**
-     * Get the partitions for this assignment
-     * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
-     * @param context the context of the topology
-     * @return the subset of the partitions that this spout should use.
-     */
-    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
deleted file mode 100644
index 0eb48cb..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that follow a given list of values.
- */
-public class NamedSubscription extends Subscription {
-    private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
-    private static final long serialVersionUID = 3438543305215813839L;
-    protected final Collection<String> topics;
-    
-    public NamedSubscription(Collection<String> topics) {
-        this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics));
-    }
-    
-    public NamedSubscription(String ... topics) {
-        this(Arrays.asList(topics));
-    }
-
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
-        consumer.subscribe(topics, listener);
-        LOG.info("Kafka consumer subscribed topics {}", topics);
-        
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
-        consumer.poll(0);
-    }
-
-    @Override
-    public String getTopicsString() {
-        return String.join(",", topics);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
deleted file mode 100644
index 982828d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedTopicFilter.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for the specified topics.
- */
-public class NamedTopicFilter implements TopicFilter {
-
-    private final Set<String> topics;
-    
-    /**
-     * Create filter based on a set of topic names.
-     * @param topics The topic names the filter will pass.
-     */
-    public NamedTopicFilter(Set<String> topics) {
-        this.topics = Collections.unmodifiableSet(topics);
-    }
-    
-    /**
-     * Convenience constructor.
-     * @param topics The topic names the filter will pass.
-     */
-    public NamedTopicFilter(String... topics) {
-        this(new HashSet<>(Arrays.asList(topics)));
-    }
-    
-    @Override
-    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
-        List<TopicPartition> allPartitions = new ArrayList<>();
-        for (String topic : topics) {
-            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
-                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
-            }
-        }
-        return allPartitions;
-    }
-
-    @Override
-    public String getTopicsString() {
-        return String.join(",", topics);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
deleted file mode 100644
index ec53f01..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Subscribe to all topics that match a given pattern.
- */
-public class PatternSubscription extends Subscription {
-    private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
-    private static final long serialVersionUID = 3438543305215813839L;
-    protected final Pattern pattern;
-    
-    public PatternSubscription(Pattern pattern) {
-        this.pattern = pattern;
-    }
-
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
-        consumer.subscribe(pattern, listener);
-        LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-        
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
-        consumer.poll(0);
-    }
-
-    @Override
-    public String getTopicsString() {
-        return pattern.pattern();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
deleted file mode 100644
index 2964874..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternTopicFilter.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.PartitionInfo;
-import org.apache.kafka.common.TopicPartition;
-
-/**
- * Filter that returns all partitions for topics matching the given {@link Pattern}.
- */
-public class PatternTopicFilter implements TopicFilter {
-
-    private final Pattern pattern;
-    private final Set<String> topics = new HashSet<>();
-
-    /**
-     * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
-     *
-     * @param pattern The Pattern to use.
-     */
-    public PatternTopicFilter(Pattern pattern) {
-        this.pattern = pattern;
-    }
-
-    @Override
-    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
-        topics.clear();
-        List<TopicPartition> allPartitions = new ArrayList<>();
-        for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
-            if (pattern.matcher(entry.getKey()).matches()) {
-                for (PartitionInfo partitionInfo : entry.getValue()) {
-                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
-                    topics.add(partitionInfo.topic());
-                }
-            }
-        }
-        return allPartitions;
-    }
-
-    @Override
-    public String getTopicsString() {
-        return String.join(",", topics);
-    }
-
-    public String getTopicsPattern() {
-        return pattern.pattern();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
deleted file mode 100644
index 4afcc49..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * Assign partitions in a round robin fashion for all spouts,
- * not just the ones that are alive. Because the parallelism of 
- * the spouts does not typically change while running this makes
- * the assignments more stable in the face of crashing spouts.
- * <p/>
- * Round Robin means that first spout of N spouts will get the first
- * partition, and the N+1th partition... The second spout will get the second partition and
- * N+2th partition etc.
- */
-public class RoundRobinManualPartitioner implements ManualPartitioner {
-
-    @Override
-    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
-        int thisTaskIndex = context.getThisTaskIndex();
-        int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
-        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
-        for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
-            myPartitions.add(allPartitions.get(i));
-        }
-        return new ArrayList<>(myPartitions);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
deleted file mode 100644
index 9c5a8c4..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A subscription to kafka.
- */
-public abstract class Subscription implements Serializable {
-    private static final long serialVersionUID = -216136367240198716L;
-
-    /**
-     * Subscribe the KafkaConsumer to the proper topics
-     * @param consumer the Consumer to get.
-     * @param listener the rebalance listener to include in the subscription
-     */
-    public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
-    
-    /**
-     * @return A human-readable string representing the subscribed topics.
-     */
-    public abstract String getTopicsString();
-    
-    /**
-     * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
-     * If you wish to do manual partition management, you must provide an implementation of this method
-     * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
-     * to inform the rest of the system of those changes.
-     */
-    public void refreshAssignment() {
-        //NOOP
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
deleted file mode 100644
index 7631c8a..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicFilter.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.io.Serializable;
-import java.util.List;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-
-public interface TopicFilter extends Serializable {
-    
-    /**
-     * Get the Kafka TopicPartitions passed by this filter. 
-     * @param consumer The Kafka consumer to use to read the list of existing partitions
-     * @return The Kafka partitions passed by this filter.
-     */
-    List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
-    
-    /**
-     * @return A human-readable string representing the topics that pass the filter.
-     */
-    String getTopicsString();
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
new file mode 100644
index 0000000..17512ea
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionSubscription extends Subscription {
+    private static final long serialVersionUID = 5633018073527583826L;
+    private final ManualPartitioner partitioner;
+    private final TopicFilter partitionFilter;
+    private Set<TopicPartition> currentAssignment = null;
+    private KafkaConsumer<?, ?> consumer = null;
+    private ConsumerRebalanceListener listener = null;
+    private TopologyContext context = null;
+
+    public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
+        this.partitionFilter = partitionFilter;
+        this.partitioner = parter;
+    }
+    
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+        this.consumer = consumer;
+        this.listener = listener;
+        this.context = context;
+        refreshAssignment();
+    }
+    
+    @Override
+    public void refreshAssignment() {
+        List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
+        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+        if (!newAssignment.equals(currentAssignment)) {
+            consumer.assign(newAssignment);
+            if (currentAssignment != null) {
+                listener.onPartitionsRevoked(currentAssignment);
+            }
+            currentAssignment = newAssignment;
+            listener.onPartitionsAssigned(newAssignment);
+        }
+    }
+    
+    @Override
+    public String getTopicsString() {
+        return partitionFilter.getTopicsString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
new file mode 100644
index 0000000..dce7fc6
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.List;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A function used to assign partitions to this spout.
+ * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
+ * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
+ * number of spouts to avoid missing partitions or double assigning partitions.
+ */
+@FunctionalInterface
+public interface ManualPartitioner {
+    /**
+     * Get the partitions for this assignment
+     * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
+     * @param context the context of the topology
+     * @return the subset of the partitions that this spout should use.
+     */
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
new file mode 100644
index 0000000..d6e5fc2
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for the specified topics.
+ */
+public class NamedTopicFilter implements TopicFilter {
+
+    private final Set<String> topics;
+    
+    /**
+     * Create filter based on a set of topic names.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(Set<String> topics) {
+        this.topics = Collections.unmodifiableSet(topics);
+    }
+    
+    /**
+     * Convenience constructor.
+     * @param topics The topic names the filter will pass.
+     */
+    public NamedTopicFilter(String... topics) {
+        this(new HashSet<>(Arrays.asList(topics)));
+    }
+    
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (String topic : topics) {
+            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return String.join(",", topics);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
new file mode 100644
index 0000000..98f8b23
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
@@ -0,0 +1,69 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Filter that returns all partitions for topics matching the given {@link Pattern}.
+ */
+public class PatternTopicFilter implements TopicFilter {
+
+    private final Pattern pattern;
+    private final Set<String> topics = new HashSet<>();
+
+    /**
+     * Creates filter based on a Pattern. Only topic names matching the Pattern are passed by the filter.
+     *
+     * @param pattern The Pattern to use.
+     */
+    public PatternTopicFilter(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+        topics.clear();
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
+            if (pattern.matcher(entry.getKey()).matches()) {
+                for (PartitionInfo partitionInfo : entry.getValue()) {
+                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                    topics.add(partitionInfo.topic());
+                }
+            }
+        }
+        return allPartitions;
+    }
+
+    @Override
+    public String getTopicsString() {
+        return String.join(",", topics);
+    }
+
+    public String getTopicsPattern() {
+        return pattern.pattern();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..9660c77
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of 
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ * <p/>
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+    @Override
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+        int thisTaskIndex = context.getThisTaskIndex();
+        int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
+        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
+        for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
+            myPartitions.add(allPartitions.get(i));
+        }
+        return new ArrayList<>(myPartitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
new file mode 100644
index 0000000..8091bfa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+    private static final long serialVersionUID = -216136367240198716L;
+
+    /**
+     * Subscribe the KafkaConsumer to the proper topics
+     * @param consumer the Consumer to get.
+     * @param listener the rebalance listener to include in the subscription
+     */
+    public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
+    
+    /**
+     * @return A human-readable string representing the subscribed topics.
+     */
+    public abstract String getTopicsString();
+    
+    /**
+     * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
+     * If you wish to do manual partition management, you must provide an implementation of this method
+     * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
+     * to inform the rest of the system of those changes.
+     */
+    public void refreshAssignment() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
new file mode 100644
index 0000000..497e3ca
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import java.util.List;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+public interface TopicFilter extends Serializable {
+    
+    /**
+     * Get the Kafka TopicPartitions passed by this filter. 
+     * @param consumer The Kafka consumer to use to read the list of existing partitions
+     * @return The Kafka partitions passed by this filter.
+     */
+    List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+    
+    /**
+     * @return A human-readable string representing the topics that pass the filter.
+     */
+    String getTopicsString();
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
index 8dc34d4..7258fe2 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutCommitTest.java
@@ -26,16 +26,15 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.kafka.clients.consumer.*;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
@@ -50,53 +49,38 @@ public class KafkaSpoutCommitTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
+    private KafkaSpoutConfig<String, String> spoutConfig;
 
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
-    private void setupSpout(Set<TopicPartition> assignedPartitions) {
+    @Before
+    public void setUp() {
         MockitoAnnotations.initMocks(this);
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
-                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .build();
-
+            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+            .build();
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
-        //Set up a spout listening to 1 topic partition
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
 
     @Test
     public void testCommitSuccessWithOffsetVoids() {
         //Verify that the commit logic can handle offset voids
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
             // Offsets emitted are 0,1,2,3,4,<void>,8,9
             for (int i = 0; i < 5; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             for (int i = 8; i < 10; i++) {
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                    .thenReturn(new ConsumerRecords(records));
+                    .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();

http://git-wip-us.apache.org/repos/asf/storm/blob/fdb649e3/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 24a2eda..8e6d390 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -16,7 +16,6 @@
 package org.apache.storm.kafka.spout;
 
 import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfigBuilder;
-import static org.mockito.Matchers.anyCollection;
 import static org.mockito.Matchers.anyLong;
 import static org.mockito.Matchers.anyObject;
 import static org.mockito.Mockito.inOrder;
@@ -32,18 +31,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
 import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.InOrder;
@@ -56,45 +53,30 @@ public class KafkaSpoutEmitTest {
     private final Map<String, Object> conf = new HashMap<>();
     private final TopicPartition partition = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1);
     private KafkaConsumer<String, String> consumerMock;
-    private KafkaSpout<String, String> spout;
-    private KafkaSpoutConfig spoutConfig;
+    private KafkaSpoutConfig<String, String> spoutConfig;
 
-    private void setupSpout(Set<TopicPartition> assignedPartitions) {
+    @Before
+    public void setUp() {
         spoutConfig = getKafkaSpoutConfigBuilder(-1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
-
         consumerMock = mock(KafkaConsumer.class);
-        KafkaConsumerFactory<String, String> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-
-        //Set up a spout listening to 1 topic partition
-        spout = new KafkaSpout<>(spoutConfig, consumerFactory);
-
-        spout.open(conf, contextMock, collectorMock);
-        spout.activate();
-
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        verify(consumerMock).subscribe(anyCollection(), rebalanceListenerCapture.capture());
-
-        //Assign partitions to the spout
-        ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
-        consumerRebalanceListener.onPartitionsAssigned(assignedPartitions);
     }
 
     @Test
     public void testNextTupleEmitsAtMostOneTuple() {
         //The spout should emit at most one message per call to nextTuple
         //This is necessary for Storm to be able to throttle the spout according to maxSpoutPending
-        setupSpout(Collections.singleton(partition));
+        KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
         Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
         List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
         for (int i = 0; i < 10; i++) {
-            recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+            recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
         }
         records.put(partition, recordsForPartition);
 
         when(consumerMock.poll(anyLong()))
-            .thenReturn(new ConsumerRecords(records));
+            .thenReturn(new ConsumerRecords<>(records));
 
         spout.nextTuple();
 
@@ -107,17 +89,17 @@ public class KafkaSpoutEmitTest {
         
         //Emit maxUncommittedOffsets messages, and fail all of them. Then ensure that the spout will retry them when the retry backoff has passed
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
             Map<TopicPartition, List<ConsumerRecord<String, String>>> records = new HashMap<>();
             List<ConsumerRecord<String, String>> recordsForPartition = new ArrayList<>();
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
                 //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
-                recordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                recordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             records.put(partition, recordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(records));
+                .thenReturn(new ConsumerRecords<>(records));
 
             for (int i = 0; i < recordsForPartition.size(); i++) {
                 spout.nextTuple();
@@ -172,13 +154,13 @@ public class KafkaSpoutEmitTest {
         
         //Emit maxUncommittedOffsets messages, and fail only the last. Then ensure that the spout will allow no more than maxUncommittedOffsets + maxPollRecords - 1 uncommitted offsets when retrying
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            setupSpout(Collections.singleton(partition));
+            KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, Collections.singleton(partition));
             
             Map<TopicPartition, List<ConsumerRecord<String, String>>> firstPollRecords = new HashMap<>();
             List<ConsumerRecord<String, String>> firstPollRecordsForPartition = new ArrayList<>();
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets(); i++) {
                 //This is cheating a bit since maxPollRecords would normally spread this across multiple polls
-                firstPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), i, "key", "value"));
+                firstPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), i, "key", "value"));
             }
             firstPollRecords.put(partition, firstPollRecordsForPartition);
             
@@ -186,13 +168,13 @@ public class KafkaSpoutEmitTest {
             Map<TopicPartition, List<ConsumerRecord<String, String>>> secondPollRecords = new HashMap<>();
             List<ConsumerRecord<String, String>> secondPollRecordsForPartition = new ArrayList<>();
             for(int i = 0; i < maxPollRecords; i++) {
-                secondPollRecordsForPartition.add(new ConsumerRecord(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
+                secondPollRecordsForPartition.add(new ConsumerRecord<>(partition.topic(), partition.partition(), spoutConfig.getMaxUncommittedOffsets() + i, "key", "value"));
             }
             secondPollRecords.put(partition, secondPollRecordsForPartition);
 
             when(consumerMock.poll(anyLong()))
-                .thenReturn(new ConsumerRecords(firstPollRecords))
-                .thenReturn(new ConsumerRecords(secondPollRecords));
+                .thenReturn(new ConsumerRecords<>(firstPollRecords))
+                .thenReturn(new ConsumerRecords<>(secondPollRecords));
 
             for (int i = 0; i < spoutConfig.getMaxUncommittedOffsets() + maxPollRecords; i++) {
                 spout.nextTuple();