You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/03/01 10:36:08 UTC

[1/3] storm git commit: STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly

Repository: storm
Updated Branches:
  refs/heads/master 93f9a1733 -> e8e1a4e8f


http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 676cb3d..c0c10f8 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -40,7 +40,6 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.Subscription;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -49,7 +48,6 @@ import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 import static org.mockito.ArgumentMatchers.any;
@@ -58,8 +56,23 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
 
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.util.HashSet;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.junit.Rule;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
 public class KafkaSpoutRebalanceTest {
 
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+
     @Captor
     private ArgumentCaptor<Map<TopicPartition, OffsetAndMetadata>> commitCapture;
 
@@ -69,23 +82,32 @@ public class KafkaSpoutRebalanceTest {
     private SpoutOutputCollector collectorMock;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactory;
+    private TopicFilter topicFilterMock;
+    private ManualPartitioner partitionerMock;
 
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
         contextMock = mock(TopologyContext.class);
         collectorMock = mock(SpoutOutputCollector.class);
         consumerMock = mock(KafkaConsumer.class);
         consumerFactory = (kafkaSpoutConfig) -> consumerMock;
+        topicFilterMock = mock(TopicFilter.class);
+        when(topicFilterMock.getAllSubscribedPartitions(any()))
+            .thenReturn(new HashSet<>());
+        partitionerMock = mock(ManualPartitioner.class);
+        when(partitionerMock.getPartitionsForThisTask(any(), any()))
+            .thenReturn(new HashSet<>());
     }
 
     //Returns messageIds in order of emission
-    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture) {
+    private List<KafkaSpoutMessageId> emitOneMessagePerPartitionThenRevokeOnePartition(KafkaSpout<String, String> spout, TopicPartition partitionThatWillBeRevoked, TopicPartition assignedPartition, TopicAssigner topicAssigner) {
         //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
 
         //Assign partitions to the spout
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(topicAssigner).assignPartitions(any(), any(), rebalanceListenerCapture.capture());
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();
         Set<TopicPartition> assignedPartitions = new HashSet<>();
         assignedPartitions.add(partitionThatWillBeRevoked);
@@ -123,21 +145,17 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
         try (SimulatedTime simulatedTime = new SimulatedTime()) {
-            ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-            Subscription subscriptionMock = mock(Subscription.class);
-            doNothing()
-                .when(subscriptionMock)
-                .subscribe(any(), rebalanceListenerCapture.capture(), any());
-            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+            TopicAssigner assignerMock = mock(TopicAssigner.class);
+            KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
                 .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-                .build(), consumerFactory);
+                .build(), consumerFactory, assignerMock);
             String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
             TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
             TopicPartition assignedPartition = new TopicPartition(topic, 2);
 
             //Emit a message on each partition and revoke the first partition
             List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
-                spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+                spout, partitionThatWillBeRevoked, assignedPartition, assignerMock);
 
             //Ack both emitted tuples
             spout.ack(emittedMessageIds.get(0));
@@ -159,16 +177,12 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        Subscription subscriptionMock = mock(Subscription.class);
-        doNothing()
-            .when(subscriptionMock)
-            .subscribe(any(), rebalanceListenerCapture.capture(), any());
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
             .setOffsetCommitPeriodMs(10)
             .setRetry(retryServiceMock)
-            .build(), consumerFactory);
+            .build(), consumerFactory, assignerMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -179,7 +193,7 @@ public class KafkaSpoutRebalanceTest {
 
         //Emit a message on each partition and revoke the first partition
         List<KafkaSpoutMessageId> emittedMessageIds = emitOneMessagePerPartitionThenRevokeOnePartition(
-            spout, partitionThatWillBeRevoked, assignedPartition, rebalanceListenerCapture);
+            spout, partitionThatWillBeRevoked, assignedPartition, assignerMock);
 
         //Check that only two message ids were generated
         verify(retryServiceMock, times(2)).getMessageId(any(TopicPartition.class), anyLong());
@@ -200,14 +214,10 @@ public class KafkaSpoutRebalanceTest {
          * Previously assigned partitions should be left alone, since the spout keeps the emitted and acked state for those.
          */
 
-        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
-        Subscription subscriptionMock = mock(Subscription.class);
-        doNothing()
-            .when(subscriptionMock)
-            .subscribe(any(), rebalanceListenerCapture.capture(), any());
-        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(subscriptionMock, -1)
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(createKafkaSpoutConfigBuilder(topicFilterMock, partitionerMock, -1)
             .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
-            .build(), consumerFactory);
+            .build(), consumerFactory, assignerMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition assignedPartition = new TopicPartition(topic, 1);
         TopicPartition newPartition = new TopicPartition(topic, 2);
@@ -215,6 +225,9 @@ public class KafkaSpoutRebalanceTest {
         //Setup spout with mock consumer so we can get at the rebalance listener   
         spout.open(conf, contextMock, collectorMock);
         spout.activate();
+        
+        ArgumentCaptor<ConsumerRebalanceListener> rebalanceListenerCapture = ArgumentCaptor.forClass(ConsumerRebalanceListener.class);
+        verify(assignerMock).assignPartitions(any(), any(), rebalanceListenerCapture.capture());
 
         //Assign partitions to the spout
         ConsumerRebalanceListener consumerRebalanceListener = rebalanceListenerCapture.getValue();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
index 64b69b0..0b5c580 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRetryLimitTest.java
@@ -18,12 +18,10 @@ package org.apache.storm.kafka.spout;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -40,7 +38,6 @@ import org.junit.Test;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
 import org.mockito.InOrder;
-import org.mockito.MockitoAnnotations;
 
 import static org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder;
 import static org.mockito.ArgumentMatchers.anyList;
@@ -48,10 +45,17 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.mock;
 
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.junit.Rule;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
 
 public class KafkaSpoutRetryLimitTest {
     
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+
     private final long offsetCommitPeriodMs = 2_000;
     private final TopologyContext contextMock = mock(TopologyContext.class);
     private final SpoutOutputCollector collectorMock = mock(SpoutOutputCollector.class);
@@ -69,8 +73,7 @@ public class KafkaSpoutRetryLimitTest {
     
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
-        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .setRetry(ZERO_RETRIES_RETRY_SERVICE)
             .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
index d92a3a7..6b8b94b 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/MaxUncommittedOffsetTest.java
@@ -82,7 +82,6 @@ public class MaxUncommittedOffsetTest {
         //This is to verify that a low maxPollRecords does not interfere with reemitting failed tuples
         //The spout must be able to reemit all retriable tuples, even if the maxPollRecords is set to a low value compared to maxUncommittedOffsets.
         assertThat("Current tests require maxPollRecords < maxUncommittedOffsets", maxPollRecords, lessThanOrEqualTo(maxUncommittedOffsets));
-        MockitoAnnotations.initMocks(this);
         spout = new KafkaSpout<>(spoutConfig);
         new KafkaConsumerFactoryDefault<String, String>().createConsumer(spoutConfig);
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
index 05cfd28..2aed182 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SpoutWithMockedConsumerSetupHelper.java
@@ -21,6 +21,7 @@ import static org.mockito.ArgumentMatchers.anyList;
 import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.mockingDetails;
 import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
@@ -41,7 +42,9 @@ import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.mockito.ArgumentCaptor;
@@ -49,8 +52,8 @@ import org.mockito.ArgumentCaptor;
 public class SpoutWithMockedConsumerSetupHelper {
 
     /**
-     * Creates, opens and activates a KafkaSpout using a mocked consumer. The subscription should be a mock object, since this method skips
-     * the subscription and instead just configures the mocked consumer to act as if the specified partitions are assigned to it.
+     * Creates, opens and activates a KafkaSpout using a mocked consumer. The TopicFilter and ManualPartitioner should be mock objects,
+     * since this method shortcircuits the TopicPartition assignment process and just calls onPartitionsAssigned on the rebalance listener.
      *
      * @param <K> The Kafka key type
      * @param <V> The Kafka value type
@@ -64,22 +67,24 @@ public class SpoutWithMockedConsumerSetupHelper {
      */
     public static <K, V> KafkaSpout<K, V> setupSpout(KafkaSpoutConfig<K, V> spoutConfig, Map<String, Object> topoConf,
         TopologyContext contextMock, SpoutOutputCollector collectorMock, KafkaConsumer<K, V> consumerMock, TopicPartition... assignedPartitions) {
-        Subscription subscriptionMock = spoutConfig.getSubscription();
-        if (!mockingDetails(subscriptionMock).isMock()) {
-            throw new IllegalStateException("Use a mocked subscription when using this method, it helps avoid complex stubbing");
+        TopicFilter topicFilter = spoutConfig.getTopicFilter();
+        ManualPartitioner topicPartitioner = spoutConfig.getTopicPartitioner();
+        if (!mockingDetails(topicFilter).isMock() || !mockingDetails(topicPartitioner).isMock()) {
+            throw new IllegalStateException("Use a mocked TopicFilter and a mocked ManualPartitioner when using this method, it helps avoid complex stubbing");
         }
         
         Set<TopicPartition> assignedPartitionsSet = new HashSet<>(Arrays.asList(assignedPartitions));
         
-        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
+        TopicAssigner assigner = mock(TopicAssigner.class);
         doAnswer(invocation -> {
-            ConsumerRebalanceListener listener = invocation.getArgument(1);
+            ConsumerRebalanceListener listener = invocation.getArgument(2);
             listener.onPartitionsAssigned(assignedPartitionsSet);
             return null;
-        }).when(subscriptionMock).subscribe(any(), any(ConsumerRebalanceListener.class), any());
+        }).when(assigner).assignPartitions(any(), any(), any());
+        when(consumerMock.assignment()).thenReturn(assignedPartitionsSet);
         
         KafkaConsumerFactory<K, V> consumerFactory = (kafkaSpoutConfig) -> consumerMock;
-        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory);
+        KafkaSpout<K, V> spout = new KafkaSpout<>(spoutConfig, consumerFactory, assigner);
         
         spout.open(topoConf, contextMock, collectorMock);
         spout.activate();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
index 3670d8a..4896267 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/config/builder/SingleTopicKafkaSpoutConfiguration.java
@@ -24,7 +24,8 @@ import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -37,8 +38,8 @@ public class SingleTopicKafkaSpoutConfiguration {
         return setCommonSpoutConfig(KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC));
     }
 
-    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(Subscription subscription, int port) {
-        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, subscription));
+    public static KafkaSpoutConfig.Builder<String, String> createKafkaSpoutConfigBuilder(TopicFilter topicFilter, ManualPartitioner topicPartitioner, int port) {
+        return setCommonSpoutConfig(new KafkaSpoutConfig.Builder<>("127.0.0.1:" + port, topicFilter, topicPartitioner));
     }
 
     public static KafkaSpoutConfig.Builder<String, String> setCommonSpoutConfig(KafkaSpoutConfig.Builder<String, String> config) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
deleted file mode 100644
index 2355283..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscriptionTest.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Copyright 2017 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.subscription;
-
-import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyList;
-import static org.mockito.Mockito.clearInvocations;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
-import org.apache.storm.task.TopologyContext;
-import org.junit.Test;
-import org.mockito.InOrder;
-
-public class ManualPartitionSubscriptionTest {
-
-    @Test
-    public void testCanReassignPartitions() {
-        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
-        TopicFilter filterMock = mock(TopicFilter.class);
-        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
-        ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
-        TopologyContext contextMock = mock(TopologyContext.class);
-        ManualPartitionSubscription subscription = new ManualPartitionSubscription(partitionerMock, filterMock);
-        
-        List<TopicPartition> onePartition = Collections.singletonList(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
-        List<TopicPartition> twoPartitions = new ArrayList<>();
-        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
-        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
-        when(partitionerMock.partition(anyList(), any(TopologyContext.class)))
-            .thenReturn(onePartition)
-            .thenReturn(twoPartitions);
-        
-        //Set the first assignment
-        subscription.subscribe(consumerMock, listenerMock, contextMock);
-        
-        InOrder inOrder = inOrder(consumerMock, listenerMock);
-        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
-        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
-        
-        clearInvocations(consumerMock, listenerMock);
-        
-        when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
-        
-        //Update to set the second assignment
-        subscription.refreshAssignment();
-        
-        //The partition revocation hook must be called before the new partitions are assigned to the consumer,
-        //to allow the revocation hook to commit offsets for the revoked partitions.
-        inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
-        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
-        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
index 3985619..a30a23a 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilterTest.java
@@ -24,6 +24,7 @@ import static org.mockito.Mockito.when;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
@@ -54,7 +55,7 @@ public class NamedTopicFilterTest {
         when(consumerMock.partitionsFor(matchingTopicTwo)).thenReturn(partitionTwoPartitions);
         when(consumerMock.partitionsFor(unmatchedTopic)).thenReturn(Collections.singletonList(createPartitionInfo(unmatchedTopic, 0)));
         
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        Set<TopicPartition> matchedPartitions = filter.getAllSubscribedPartitions(consumerMock);
         
         assertThat("Expected filter to pass only topics with exact name matches", matchedPartitions, 
             containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
index 67411e3..01e0b3d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilterTest.java
@@ -26,6 +26,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
@@ -61,7 +62,7 @@ public class PatternTopicFilterTest {
         
         when(consumerMock.listTopics()).thenReturn(allTopics);
         
-        List<TopicPartition> matchedPartitions = filter.getFilteredTopicPartitions(consumerMock);
+        Set<TopicPartition> matchedPartitions = filter.getAllSubscribedPartitions(consumerMock);
         
         assertThat("Expected topic partitions matching the pattern to be passed by the filter", matchedPartitions,
             containsInAnyOrder(new TopicPartition(matchingTopicOne, 0), new TopicPartition(matchingTopicTwo, 0), new TopicPartition(matchingTopicTwo, 1)));

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
new file mode 100644
index 0000000..f4deeba
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitionerTest.java
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.task.TopologyContext;
+import org.junit.Test;
+
+public class RoundRobinManualPartitionerTest {
+
+    private TopicPartition createTp(int partition) {
+        return new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, partition);
+    }
+    
+    private Set<TopicPartition> partitionsToTps(int[] expectedPartitions) {
+        Set<TopicPartition> expectedTopicPartitions = new HashSet<>();
+        for(int i = 0; i < expectedPartitions.length; i++) {
+            expectedTopicPartitions.add(createTp(expectedPartitions[i]));
+        }
+        return expectedTopicPartitions;
+    }
+    
+    @Test
+    public void testRoundRobinPartitioning() {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for(int i = 0; i < 11; i++) {
+            allPartitions.add(createTp(i));
+        }
+        List<TopologyContext> contextMocks = new ArrayList<>();
+        String thisComponentId = "A spout";
+        List<Integer> allTasks = Arrays.asList(new Integer[]{0, 1, 2});
+        for(int i = 0; i < 3; i++) {
+            TopologyContext contextMock = mock(TopologyContext.class);
+            when(contextMock.getThisTaskIndex()).thenReturn(i);
+            when(contextMock.getThisComponentId()).thenReturn(thisComponentId);
+            when(contextMock.getComponentTasks(thisComponentId)).thenReturn(allTasks);
+            contextMocks.add(contextMock);
+        }
+        RoundRobinManualPartitioner partitioner = new RoundRobinManualPartitioner();
+        
+        Set<TopicPartition> partitionsForFirstTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(0));
+        assertThat(partitionsForFirstTask, is(partitionsToTps(new int[]{0, 3, 6, 9})));
+        
+        Set<TopicPartition> partitionsForSecondTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(1));
+        assertThat(partitionsForSecondTask, is(partitionsToTps(new int[]{1, 4, 7, 10})));
+        
+        Set<TopicPartition> partitionsForThirdTask = partitioner.getPartitionsForThisTask(allPartitions, contextMocks.get(2));
+        assertThat(partitionsForThirdTask, is(partitionsToTps(new int[]{2, 5, 8})));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
new file mode 100644
index 0000000..96bbc1c
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/subscription/TopicAssignerTest.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import static org.mockito.Mockito.clearInvocations;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.junit.Test;
+import org.mockito.InOrder;
+
+public class TopicAssignerTest {
+
+    @Test
+    public void testCanReassignPartitions() {    
+        Set<TopicPartition> onePartition = Collections.singleton(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        Set<TopicPartition> twoPartitions = new HashSet<>();
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0));
+        twoPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 1));
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        ConsumerRebalanceListener listenerMock = mock(ConsumerRebalanceListener.class);
+        TopicAssigner assigner = new TopicAssigner();
+        
+        //Set the first assignment
+        assigner.assignPartitions(consumerMock, onePartition, listenerMock);
+        
+        InOrder inOrder = inOrder(consumerMock, listenerMock);
+        inOrder.verify(listenerMock).onPartitionsRevoked(Collections.emptySet());
+        inOrder.verify(consumerMock).assign(new HashSet<>(onePartition));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(onePartition));
+        
+        clearInvocations(consumerMock, listenerMock);
+        
+        when(consumerMock.assignment()).thenReturn(new HashSet<>(onePartition));
+        
+        //Update to set the second assignment
+        assigner.assignPartitions(consumerMock, twoPartitions, listenerMock);
+        
+        //The partition revocation hook must be called before the new partitions are assigned to the consumer,
+        //to allow the revocation hook to commit offsets for the revoked partitions.
+        inOrder.verify(listenerMock).onPartitionsRevoked(new HashSet<>(onePartition));
+        inOrder.verify(consumerMock).assign(new HashSet<>(twoPartitions));
+        inOrder.verify(listenerMock).onPartitionsAssigned(new HashSet<>(twoPartitions));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
index a5c78a8..a15e415 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadataTest.java
@@ -19,9 +19,9 @@ package org.apache.storm.kafka.spout.trident;
 import static org.hamcrest.CoreMatchers.is;
 import static org.junit.Assert.assertThat;
 
-import java.util.Collections;
+import java.util.List;
 import java.util.Map;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
 import org.json.simple.JSONValue;
@@ -38,27 +38,24 @@ public class KafkaTridentSpoutBatchMetadataTest {
          * It is important that all map entries are types json-simple knows about,
          * since otherwise the library just calls toString on them which will likely produce invalid JSON.
          */
-        TopicPartition tp = new TopicPartition("topic", 0);
         long startOffset = 10;
         long endOffset = 20;
 
-        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, startOffset, endOffset);
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(startOffset, endOffset);
         Map<String, Object> map = metadata.toMap();
         Map deserializedMap = (Map)JSONValue.parseWithException(JSONValue.toJSONString(map));
         KafkaTridentSpoutBatchMetadata deserializedMetadata = KafkaTridentSpoutBatchMetadata.fromMap(deserializedMap);
-        assertThat(deserializedMetadata.getTopicPartition(), is(metadata.getTopicPartition()));
         assertThat(deserializedMetadata.getFirstOffset(), is(metadata.getFirstOffset()));
         assertThat(deserializedMetadata.getLastOffset(), is(metadata.getLastOffset()));
     }
 
     @Test
     public void testCreateMetadataFromRecords() {
-        TopicPartition tp = new TopicPartition("topic", 0);
         long firstOffset = 15;
         long lastOffset = 55;
-        ConsumerRecords<?, ?> records = new ConsumerRecords<>(Collections.singletonMap(tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, (int) (lastOffset - firstOffset + 1))));
+        List<ConsumerRecord<String, String>> records = SpoutWithMockedConsumerSetupHelper.createRecords(new TopicPartition("test", 0), firstOffset, (int) (lastOffset - firstOffset + 1));
 
-        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(tp, records);
+        KafkaTridentSpoutBatchMetadata metadata = new KafkaTridentSpoutBatchMetadata(records);
         assertThat("The first offset should be the first offset in the record set", metadata.getFirstOffset(), is(firstOffset));
         assertThat("The last offset should be the last offset in the record set", metadata.getLastOffset(), is(lastOffset));
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
new file mode 100644
index 0000000..6208ce4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitterTest.java
@@ -0,0 +1,214 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyList;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.SpoutWithMockedConsumerSetupHelper;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.junit.Rule;
+import org.junit.Test;
+import org.mockito.ArgumentCaptor;
+import org.mockito.Captor;
+import org.mockito.InOrder;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
+public class KafkaTridentSpoutEmitterTest {
+    
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+    
+    @Captor
+    public ArgumentCaptor<Collection<TopicPartition>> assignmentCaptor;
+    
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    
+    @Test
+    public void testGetOrderedPartitionsIsConsistent() {
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            mock(TopologyContext.class),
+            mock(KafkaConsumerFactory.class), new TopicAssigner());
+        
+        Set<TopicPartition> allPartitions = new HashSet<>();
+        int numPartitions = 10;
+        for (int i = 0; i < numPartitions; i++) {
+            allPartitions.add(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<Map<String, Object>> serializedPartitions = allPartitions.stream()
+            .map(tp -> tpSerializer.toMap(tp))
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> orderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Should contain all partitions", orderedPartitions.size(), is(allPartitions.size()));
+        Collections.shuffle(serializedPartitions);
+        List<KafkaTridentSpoutTopicPartition> secondGetOrderedPartitions = emitter.getOrderedPartitions(serializedPartitions);
+        assertThat("Ordering must be consistent", secondGetOrderedPartitions, is(orderedPartitions));
+        
+        serializedPartitions.add(tpSerializer.toMap(new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, numPartitions)));
+        List<KafkaTridentSpoutTopicPartition> orderedPartitionsWithNewPartition = emitter.getOrderedPartitions(serializedPartitions);
+        orderedPartitionsWithNewPartition.remove(orderedPartitionsWithNewPartition.size() - 1);
+        assertThat("Adding new partitions should not shuffle the existing ordering", orderedPartitionsWithNewPartition, is(orderedPartitions));
+    }
+    
+    @Test
+    public void testGetPartitionsForTask() {
+        //Verify correct wrapping/unwrapping of partition and delegation of partition assignment
+        ManualPartitioner partitionerMock = mock(ManualPartitioner.class);
+        when(partitionerMock.getPartitionsForThisTask(any(), any()))
+            .thenAnswer(invocation -> {
+                List<TopicPartition> partitions = new ArrayList<>((List<TopicPartition>) invocation.getArguments()[0]);
+                partitions.remove(0);
+                return new HashSet<>(partitions);
+            });
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mock(TopicFilter.class), partitionerMock, -1)
+                .build(),
+            mock(TopologyContext.class),
+            mock(KafkaConsumerFactory.class), new TopicAssigner());
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        List<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toList());
+        
+        List<KafkaTridentSpoutTopicPartition> partitionsForTask = emitter.getPartitionsForTask(0, 2, allPartitions);
+        verify(partitionerMock).getPartitionsForThisTask(eq(unwrappedPartitions), any(TopologyContext.class));
+        allPartitions.remove(0);
+        assertThat("Should have assigned all except the first partition to this task", new HashSet<>(partitionsForTask), is(new HashSet<>(allPartitions)));
+    }
+    
+    @Test
+    public void testAssignPartitions() {
+        //Verify correct unwrapping of partitions and delegation of assignment
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+        TopicAssigner assignerMock = mock(TopicAssigner.class);
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .build(),
+            mock(TopologyContext.class),
+            consumerFactory, assignerMock);
+        
+        List<KafkaTridentSpoutTopicPartition> allPartitions = new ArrayList<>();
+        for (int i = 0; i < 10; i++) {
+            allPartitions.add(new KafkaTridentSpoutTopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, i));
+        }
+        Set<TopicPartition> unwrappedPartitions = allPartitions.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toSet());
+        
+        emitter.refreshPartitions(allPartitions);
+        
+        verify(assignerMock).assignPartitions(any(KafkaConsumer.class), eq(unwrappedPartitions), any(ConsumerRebalanceListener.class));
+    }
+    
+    private Map<String, Object> doEmitBatchTest(KafkaConsumer<String, String> consumerMock, TridentCollector collectorMock, TopicPartition tp, long firstOffset, int numRecords, Map<String, Object> previousBatchMeta) {
+        when(consumerMock.assignment()).thenReturn(Collections.singleton(tp));
+        when(consumerMock.poll(anyLong())).thenReturn(new ConsumerRecords<>(Collections.singletonMap(
+            tp, SpoutWithMockedConsumerSetupHelper.createRecords(tp, firstOffset, numRecords))));
+        KafkaConsumerFactory<String, String> consumerFactory = spoutConfig -> consumerMock;
+        
+        KafkaTridentSpoutEmitter<String, String> emitter = new KafkaTridentSpoutEmitter<>(
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(-1)
+                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST)
+                .build(),
+            mock(TopologyContext.class),
+            consumerFactory, new TopicAssigner());
+        
+        TransactionAttempt txid = new TransactionAttempt(10L, 0);
+        KafkaTridentSpoutTopicPartition kttp = new KafkaTridentSpoutTopicPartition(tp);
+        return emitter.emitPartitionBatch(txid, collectorMock, kttp, previousBatchMeta);
+    }
+    
+    @Test
+    public void testEmitBatchWithNullMeta() {
+        //Check that null meta makes the spout seek according to FirstPollOffsetStrategy, and that the returned meta is correct
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        TridentCollector collectorMock = mock(TridentCollector.class);
+        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        long firstOffset = 0;
+        int numRecords = 10;
+        Map<String, Object> batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, null);
+        
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).seekToBeginning(Collections.singleton(tp));
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(collectorMock, times(numRecords)).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+    }
+    
+    @Test
+    public void testEmitBatchWithPreviousMeta() {
+        //Check that non-null meta makes the spout seek according to the provided metadata, and that the returned meta is correct
+        KafkaConsumer<String, String> consumerMock = mock(KafkaConsumer.class);
+        TridentCollector collectorMock = mock(TridentCollector.class);
+        TopicPartition tp = new TopicPartition(SingleTopicKafkaSpoutConfiguration.TOPIC, 0);
+        long firstOffset = 50;
+        int numRecords = 10;
+        KafkaTridentSpoutBatchMetadata previousBatchMeta = new KafkaTridentSpoutBatchMetadata(0, firstOffset - 1);
+        Map<String, Object> batchMeta = doEmitBatchTest(consumerMock, collectorMock, tp, firstOffset, numRecords, previousBatchMeta.toMap());
+        
+        InOrder inOrder = inOrder(consumerMock, collectorMock);
+        inOrder.verify(consumerMock).seek(tp, firstOffset);
+        inOrder.verify(consumerMock).poll(anyLong());
+        inOrder.verify(collectorMock, times(numRecords)).emit(anyList());
+        KafkaTridentSpoutBatchMetadata deserializedMeta = KafkaTridentSpoutBatchMetadata.fromMap(batchMeta);
+        assertThat("The batch should start at the first offset of the polled records", deserializedMeta.getFirstOffset(), is(firstOffset));
+        assertThat("The batch should end at the last offset of the polled messages", deserializedMeta.getLastOffset(), is(firstOffset + numRecords - 1));
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
new file mode 100644
index 0000000..1abe551
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinatorTest.java
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
+import org.apache.storm.utils.Time;
+import org.apache.storm.utils.Time.SimulatedTime;
+import org.junit.Test;
+
+public class KafkaTridentSpoutOpaqueCoordinatorTest {
+
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+
+    @Test
+    public void testCanGetPartitions() {
+        KafkaConsumer<String, String> mockConsumer = mock(KafkaConsumer.class);
+        TopicPartition expectedPartition = new TopicPartition("test", 0);
+        TopicFilter mockFilter = mock(TopicFilter.class);
+        when(mockFilter.getAllSubscribedPartitions(any())).thenReturn(Collections.singleton(expectedPartition));
+
+        KafkaSpoutConfig<String, String> spoutConfig = 
+            SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+                .build();
+        KafkaTridentSpoutOpaqueCoordinator<String, String> coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer);
+
+        List<Map<String, Object>> partitionsForBatch = coordinator.getPartitionsForBatch();
+
+        List<TopicPartition> tps = deserializePartitions(partitionsForBatch);
+
+        verify(mockFilter).getAllSubscribedPartitions(mockConsumer);
+        assertThat(tps, contains(expectedPartition));
+
+    }
+
+    @Test
+    public void testCanUpdatePartitions() {
+        try (SimulatedTime time = new SimulatedTime()) {
+            KafkaConsumer<String, String> mockConsumer = mock(KafkaConsumer.class);
+            TopicPartition expectedPartition = new TopicPartition("test", 0);
+            TopicPartition addedLaterPartition = new TopicPartition("test-2", 0);
+            HashSet<TopicPartition> allPartitions = new HashSet<>();
+            allPartitions.add(expectedPartition);
+            allPartitions.add(addedLaterPartition);
+            TopicFilter mockFilter = mock(TopicFilter.class);
+            when(mockFilter.getAllSubscribedPartitions(any()))
+                .thenReturn(Collections.singleton(expectedPartition))
+                .thenReturn(allPartitions);
+
+            KafkaSpoutConfig<String, String> spoutConfig = 
+                SingleTopicKafkaSpoutConfiguration.createKafkaSpoutConfigBuilder(mockFilter, mock(ManualPartitioner.class), -1)
+                    .build();
+            KafkaTridentSpoutOpaqueCoordinator<String, String> coordinator = new KafkaTridentSpoutOpaqueCoordinator<>(spoutConfig, ignored -> mockConsumer);
+
+            List<Map<String, Object>> partitionsForBatch = coordinator.getPartitionsForBatch();
+
+            List<TopicPartition> firstBatchTps = deserializePartitions(partitionsForBatch);
+            
+            verify(mockFilter).getAllSubscribedPartitions(mockConsumer);
+            assertThat(firstBatchTps, contains(expectedPartition));
+
+            Time.advanceTime(KafkaTridentSpoutOpaqueCoordinator.TIMER_DELAY_MS + spoutConfig.getPartitionRefreshPeriodMs());
+
+            List<Map<String, Object>> partitionsForSecondBatch = coordinator.getPartitionsForBatch();
+            
+            List<TopicPartition> secondBatchTps = deserializePartitions(partitionsForSecondBatch);
+            verify(mockFilter, times(2)).getAllSubscribedPartitions(mockConsumer);
+            assertThat(new HashSet<>(secondBatchTps), is(allPartitions));
+
+        }
+    }
+
+    private List<TopicPartition> deserializePartitions(List<Map<String, Object>> tps) {
+        return tps.stream()
+            .map(map -> tpSerializer.fromMap(map))
+            .collect(Collectors.toList());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 2260d66..3886a82 100644
--- a/pom.xml
+++ b/pom.xml
@@ -896,12 +896,12 @@
                 <artifactId>guava</artifactId>
                 <version>${guava.version}</version>
             </dependency>
-            <dependency>
-                <groupId>com.google.auto.service</groupId>
-                <artifactId>auto-service</artifactId>
-                <version>${auto-service.version}</version>
-                <optional>true</optional>
-            </dependency>
+                <dependency>
+                    <groupId>com.google.auto.service</groupId>
+                    <artifactId>auto-service</artifactId>
+                    <version>${auto-service.version}</version>
+                    <optional>true</optional>
+                </dependency>
             <dependency>
                 <groupId>org.apache.logging.log4j</groupId>
                 <artifactId>log4j-api</artifactId>

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
index 5fe3c65..8dd1301 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/IOpaquePartitionedTridentSpout.java
@@ -72,23 +72,30 @@ public interface IOpaquePartitionedTridentSpout<Partitions, Partition extends IS
         /**
          * This method is called when this task is responsible for a new set of partitions. Should be used
          * to manage things like connections to brokers.
+         * @param partitionResponsibilities The partitions assigned to this task
          */        
         void refreshPartitions(List<Partition> partitionResponsibilities);
 
         /**
-         * @return The oredered list of partitions being processed by all the tasks
+         * Sorts the partition info to produce an ordered list of partition.
+         * @param allPartitionInfo The partition info for all partitions being processed by all spout tasks
+         * @return The ordered list of partitions being processed by all the tasks. The ordering must be consistent for all tasks.
          */
         List<Partition> getOrderedPartitions(Partitions allPartitionInfo);
 
         /**
+         * Get the partitions assigned to this task.
+         * @param taskId The id of this task
+         * @param numTasks The number of tasks for this spout
+         * @param allPartitionInfoSorted The partition info for all partitions being processed by all spout tasks,
+         *     sorted according to {@link #getOrderedPartitions(java.lang.Object)}
          * @return The list of partitions that are to be processed by the task with id {@code taskId}
          */
-        default List<Partition> getPartitionsForTask(int taskId, int numTasks, Partitions allPartitionInfo){
-            final List<Partition> orderedPartitions = getOrderedPartitions(allPartitionInfo);
-            final List<Partition> taskPartitions = new ArrayList<>(orderedPartitions == null ? 0 : orderedPartitions.size());
-            if (orderedPartitions != null) {
-                for (int i = taskId; i < orderedPartitions.size(); i += numTasks) {
-                    taskPartitions.add(orderedPartitions.get(i));
+        default List<Partition> getPartitionsForTask(int taskId, int numTasks, List<Partition> allPartitionInfoSorted){
+            final List<Partition> taskPartitions = new ArrayList<>(allPartitionInfoSorted == null ? 0 : allPartitionInfoSorted.size());
+            if (allPartitionInfoSorted != null) {
+                for (int i = taskId; i < allPartitionInfoSorted.size(); i += numTasks) {
+                    taskPartitions.add(allPartitionInfoSorted.get(i));
                 }
             }
             return taskPartitions;

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
----------------------------------------------------------------------
diff --git a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
index e7bf70a..f381318 100644
--- a/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
+++ b/storm-client/src/jvm/org/apache/storm/trident/spout/OpaquePartitionedTridentSpoutExecutor.java
@@ -111,13 +111,13 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
 
             if(_savedCoordinatorMeta==null || !_savedCoordinatorMeta.equals(coordinatorMeta)) {
                 _partitionStates.clear();
-                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, coordinatorMeta);
+                final List<ISpoutPartition> sortedPartitions = _emitter.getOrderedPartitions(coordinatorMeta);
+                final List<ISpoutPartition> taskPartitions = _emitter.getPartitionsForTask(_index, _numTasks, sortedPartitions);
                 for (ISpoutPartition partition : taskPartitions) {
                     _partitionStates.put(partition.getId(), new EmitterPartitionState(new RotatingTransactionalState(_state, partition.getId()), partition));
                 }
+                _emitter.refreshPartitions(taskPartitions);
 
-                // refresh all partitions for backwards compatibility with old spout
-                _emitter.refreshPartitions(_emitter.getOrderedPartitions(coordinatorMeta));
                 _savedCoordinatorMeta = coordinatorMeta;
                 _changedMeta = true;
             }
@@ -137,7 +137,9 @@ public class OpaquePartitionedTridentSpoutExecutor implements ICommitterTridentS
                 EmitterPartitionState s = e.getValue();
                 s.rotatingState.removeState(tx.getTransactionId());
                 Object lastMeta = prevCached.get(id);
-                if(lastMeta==null) lastMeta = s.rotatingState.getLastState();
+                if(lastMeta==null) {
+                    lastMeta = s.rotatingState.getLastState();
+                }
                 Object meta = _emitter.emitPartitionBatch(tx, collector, s.partition, lastMeta);
                 metas.put(id, meta);
             }


[2/3] storm git commit: STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly

Posted by ka...@apache.org.
STORM-2691: storm-kafka-client Trident spout does not implement the Trident spout interface properly


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/a20b2659
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/a20b2659
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/a20b2659

Branch: refs/heads/master
Commit: a20b2659c20b1a12dd455023010f4454d62ee313
Parents: 09e0123
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Aug 18 22:13:38 2017 +0200
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Sat Feb 17 23:18:45 2018 +0100

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  41 ++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  44 ++--
 .../ManualPartitionSubscription.java            |  69 ------
 .../spout/subscription/ManualPartitioner.java   |  13 +-
 .../spout/subscription/NamedTopicFilter.java    |   5 +-
 .../spout/subscription/PatternTopicFilter.java  |   6 +-
 .../RoundRobinManualPartitioner.java            |  11 +-
 .../kafka/spout/subscription/Subscription.java  |  57 -----
 .../kafka/spout/subscription/TopicAssigner.java |  51 +++++
 .../kafka/spout/subscription/TopicFilter.java   |   8 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |  55 ++---
 .../spout/trident/KafkaTridentSpoutEmitter.java | 130 +++++------
 .../spout/trident/KafkaTridentSpoutManager.java | 116 ----------
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  26 +--
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  43 +++-
 .../KafkaTridentSpoutTopicPartition.java        |   6 +-
 ...KafkaTridentSpoutTopicPartitionRegistry.java |  47 ----
 .../trident/KafkaTridentSpoutTransactional.java |  49 -----
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  13 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   7 +-
 .../KafkaSpoutLogCompactionSupportTest.java     |   5 +-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  21 +-
 .../kafka/spout/KafkaSpoutReactivationTest.java |   3 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  67 +++---
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  15 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   1 -
 .../SpoutWithMockedConsumerSetupHelper.java     |  25 ++-
 .../SingleTopicKafkaSpoutConfiguration.java     |   7 +-
 .../ManualPartitionSubscriptionTest.java        |  79 -------
 .../subscription/NamedTopicFilterTest.java      |   3 +-
 .../subscription/PatternTopicFilterTest.java    |   3 +-
 .../RoundRobinManualPartitionerTest.java        |  76 +++++++
 .../spout/subscription/TopicAssignerTest.java   |  68 ++++++
 .../KafkaTridentSpoutBatchMetadataTest.java     |  13 +-
 .../trident/KafkaTridentSpoutEmitterTest.java   | 214 +++++++++++++++++++
 .../KafkaTridentSpoutOpaqueCoordinatorTest.java | 111 ++++++++++
 pom.xml                                         |  12 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |  21 +-
 .../OpaquePartitionedTridentSpoutExecutor.java  |  10 +-
 39 files changed, 864 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 39bdb93..9f9f5bb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -24,7 +24,6 @@ import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrat
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
@@ -57,6 +56,7 @@ import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -68,7 +68,7 @@ import org.slf4j.LoggerFactory;
 public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private static final long serialVersionUID = 4151921085047987154L;
-    //Initial delay for the commit and subscription refresh timers
+    //Initial delay for the commit and assignment refresh timers
     public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
 
@@ -77,7 +77,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+    private final KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+    private final TopicAssigner topicAssigner;
     private transient KafkaConsumer<K, V> kafkaConsumer;
 
     // Bookkeeping
@@ -99,19 +100,21 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient Set<KafkaSpoutMessageId> emitted;
     // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
     private transient Map<TopicPartition, List<ConsumerRecord<K, V>>> waitingToEmit;
-    // Triggers when a subscription should be refreshed
-    private transient Timer refreshSubscriptionTimer;
+    // Triggers when an assignment should be refreshed
+    private transient Timer refreshAssignmentTimer;
     private transient TopologyContext context;
     private transient CommitMetadataManager commitMetadataManager;
     private transient KafkaOffsetMetric kafkaOffsetMetric;
+    private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>(), new TopicAssigner());
     }
 
     @VisibleForTesting
-    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
+    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
         this.kafkaConsumerFactory = kafkaConsumerFactory;
+        this.topicAssigner = topicAssigner;
         this.kafkaSpoutConfig = kafkaSpoutConfig;
     }
 
@@ -134,13 +137,15 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             // In at-most-once mode the offsets are committed after every poll, and not periodically as controlled by the timer
             commitTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
-        refreshSubscriptionTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+        refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
         offsetManagers = new HashMap<>();
         emitted = new HashSet<>();
         waitingToEmit = new HashMap<>();
         commitMetadataManager = new CommitMetadataManager(context, kafkaSpoutConfig.getProcessingGuarantee());
 
+        rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
+
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
             registerMetric();
@@ -267,8 +272,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void nextTuple() {
         try {
-            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
-                kafkaSpoutConfig.getSubscription().refreshAssignment();
+            if (refreshAssignmentTimer.isExpiredResetOnTrue()) {
+                refreshAssignment();
             }
 
             if (commitTimer != null && commitTimer.isExpiredResetOnTrue()) {
@@ -617,16 +622,20 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void activate() {
         try {
-            subscribeKafkaConsumer();
+            kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+            refreshAssignment();
         } catch (InterruptException e) {
             throwKafkaConsumerInterruptedException();
         }
     }
 
-    private void subscribeKafkaConsumer() {
-        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
-
-        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
+    private void refreshAssignment() {
+        Set<TopicPartition> allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(kafkaConsumer);
+        List<TopicPartition> allPartitionsSorted = new ArrayList<>(allPartitions);
+        Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
+            .getPartitionsForThisTask(allPartitionsSorted, context);
+        topicAssigner.assignPartitions(kafkaConsumer, assignedPartitions, rebalanceListener);
     }
 
     @Override
@@ -691,7 +700,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private String getTopicsString() {
-        return kafkaSpoutConfig.getSubscription().getTopicsString();
+        return kafkaSpoutConfig.getTopicFilter().getTopicsString();
     }
 
     private static class PollablePartitionsInfo {

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 40e449a..3b7be2b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -33,11 +33,11 @@ import org.apache.kafka.common.serialization.StringDeserializer;
 import org.apache.storm.Config;
 import org.apache.storm.annotation.InterfaceStability;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-import org.apache.storm.kafka.spout.subscription.ManualPartitionSubscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
 import org.apache.storm.kafka.spout.subscription.NamedTopicFilter;
 import org.apache.storm.kafka.spout.subscription.PatternTopicFilter;
 import org.apache.storm.kafka.spout.subscription.RoundRobinManualPartitioner;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -75,7 +75,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
 
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
-    private final Subscription subscription;
+    private final TopicFilter topicFilter;
+    private final ManualPartitioner topicPartitioner;
     private final long pollTimeoutMs;
 
     // Kafka spout configuration
@@ -99,7 +100,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public KafkaSpoutConfig(Builder<K, V> builder) {
         setKafkaPropsForProcessingGuarantee(builder);
         this.kafkaProps = builder.kafkaProps;
-        this.subscription = builder.subscription;
+        this.topicFilter = builder.topicFilter;
+        this.topicPartitioner = builder.topicPartitioner;
         this.translator = builder.translator;
         this.pollTimeoutMs = builder.pollTimeoutMs;
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
@@ -175,7 +177,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     public static class Builder<K, V> {
 
         private final Map<String, Object> kafkaProps;
-        private final Subscription subscription;
+        private final TopicFilter topicFilter;
+        private final ManualPartitioner topicPartitioner;
         private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
@@ -190,31 +193,32 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
 
         public Builder(String bootstrapServers, String... topics) {
-            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
         }
 
         public Builder(String bootstrapServers, Set<String> topics) {
-            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
-                new NamedTopicFilter(topics)));
+            this(bootstrapServers, new NamedTopicFilter(topics), new RoundRobinManualPartitioner());
         }
 
         public Builder(String bootstrapServers, Pattern topics) {
-            this(bootstrapServers, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+            this(bootstrapServers, new PatternTopicFilter(topics), new RoundRobinManualPartitioner());
         }
 
         /**
          * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
          *
          * @param bootstrapServers The bootstrap servers the consumer will use
-         * @param subscription The subscription defining which topics and partitions each spout instance will read.
+         * @param topicFilter The topic filter defining which topics and partitions the spout will read
+         * @param topicPartitioner The topic partitioner defining which topics and partitions are assinged to each spout task
          */
-        public Builder(String bootstrapServers, Subscription subscription) {
+        public Builder(String bootstrapServers, TopicFilter topicFilter, ManualPartitioner topicPartitioner) {
             kafkaProps = new HashMap<>();
             if (bootstrapServers == null || bootstrapServers.isEmpty()) {
                 throw new IllegalArgumentException("bootstrap servers cannot be null");
             }
             kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
-            this.subscription = subscription;
+            this.topicFilter = topicFilter;
+            this.topicPartitioner = topicPartitioner;
             this.translator = new DefaultRecordTranslator<>();
         }
 
@@ -358,9 +362,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         }
 
         /**
-         * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
-         * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
-         * PatternSubscription rely on kafka to handle this instead.
+         * Sets partition refresh period in milliseconds. This is how often Kafka will be polled to check for new topics and/or new
+         * partitions.
          *
          * @param partitionRefreshPeriodMs time in milliseconds
          * @return the builder (this)
@@ -502,8 +505,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return kafkaProps;
     }
 
-    public Subscription getSubscription() {
-        return subscription;
+    public TopicFilter getTopicFilter() {
+        return topicFilter;
+    }
+
+    public ManualPartitioner getTopicPartitioner() {
+        return topicPartitioner;
     }
 
     public RecordTranslator<K, V> getTranslator() {
@@ -566,7 +573,8 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
             + ", maxUncommittedOffsets=" + maxUncommittedOffsets
             + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
-            + ", subscription=" + subscription
+            + ", topicFilter=" + topicFilter
+            + ", topicPartitioner=" + topicPartitioner
             + ", translator=" + translator
             + ", retryService=" + retryService
             + ", tupleListener=" + tupleListener

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
deleted file mode 100644
index 8e74abb..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitionSubscription.java
+++ /dev/null
@@ -1,69 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.subscription;
-
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.TopicPartitionComparator;
-import org.apache.storm.task.TopologyContext;
-
-public class ManualPartitionSubscription extends Subscription {
-    private static final long serialVersionUID = 5633018073527583826L;
-    private final ManualPartitioner partitioner;
-    private final TopicFilter partitionFilter;
-    private transient KafkaConsumer<?, ?> consumer = null;
-    private transient ConsumerRebalanceListener listener = null;
-    private transient TopologyContext context = null;
-
-    public ManualPartitionSubscription(ManualPartitioner parter, TopicFilter partitionFilter) {
-        this.partitionFilter = partitionFilter;
-        this.partitioner = parter;
-    }
-    
-    @Override
-    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
-        this.consumer = consumer;
-        this.listener = listener;
-        this.context = context;
-        refreshAssignment();
-    }
-    
-    @Override
-    public void refreshAssignment() {
-        List<TopicPartition> allPartitions = partitionFilter.getFilteredTopicPartitions(consumer);
-        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
-        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
-        Set<TopicPartition> currentAssignment = consumer.assignment();
-        if (!newAssignment.equals(currentAssignment)) {
-            listener.onPartitionsRevoked(currentAssignment);
-            consumer.assign(newAssignment);
-            listener.onPartitionsAssigned(newAssignment);
-        }
-    }
-    
-    @Override
-    public String getTopicsString() {
-        return partitionFilter.getTopicsString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
index 0e35b73..9db0613 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/ManualPartitioner.java
@@ -20,22 +20,25 @@ package org.apache.storm.kafka.spout.subscription;
 
 import java.io.Serializable;
 import java.util.List;
+import java.util.Set;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 
 /**
  * A function used to assign partitions to this spout.
- * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
+ * 
+ * <p>WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
  * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
  * number of spouts to avoid missing partitions or double assigning partitions.
  */
 @FunctionalInterface
 public interface ManualPartitioner extends Serializable {
     /**
-     * Get the partitions for this assignment.
-     * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
+     * Filter the list of all partitions handled by this set of spouts to get only the partitions assigned to this task.
+     * @param allPartitionsSorted all of the partitions that the set of spouts want to subscribe to
+     *     in a strict ordering that is consistent across tasks
      * @param context the context of the topology
-     * @return the subset of the partitions that this spout should use.
+     * @return the subset of the partitions that this spout task should handle.
      */
-    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
+    public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index d6e5fc2..7c25596 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -16,7 +16,6 @@
 
 package org.apache.storm.kafka.spout.subscription;
 
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -50,8 +49,8 @@ public class NamedTopicFilter implements TopicFilter {
     }
     
     @Override
-    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
-        List<TopicPartition> allPartitions = new ArrayList<>();
+    public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
+        Set<TopicPartition> allPartitions = new HashSet<>();
         for (String topic : topics) {
             for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
                 allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
index 98f8b23..554876f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
@@ -17,6 +17,7 @@
 package org.apache.storm.kafka.spout.subscription;
 
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -25,6 +26,7 @@ import java.util.regex.Pattern;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
 
 /**
  * Filter that returns all partitions for topics matching the given {@link Pattern}.
@@ -44,9 +46,9 @@ public class PatternTopicFilter implements TopicFilter {
     }
 
     @Override
-    public List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer) {
+    public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
         topics.clear();
-        List<TopicPartition> allPartitions = new ArrayList<>();
+        Set<TopicPartition> allPartitions = new HashSet<>();
         for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {
             if (pattern.matcher(entry.getKey()).matches()) {
                 for (PartitionInfo partitionInfo : entry.getValue()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
index 9660c77..ee2916a 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/RoundRobinManualPartitioner.java
@@ -18,7 +18,6 @@
 
 package org.apache.storm.kafka.spout.subscription;
 
-import java.util.ArrayList;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
@@ -38,13 +37,13 @@ import org.apache.storm.task.TopologyContext;
 public class RoundRobinManualPartitioner implements ManualPartitioner {
 
     @Override
-    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+    public Set<TopicPartition> getPartitionsForThisTask(List<TopicPartition> allPartitionsSorted, TopologyContext context) {
         int thisTaskIndex = context.getThisTaskIndex();
         int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
-        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size() / totalTaskCount + 1);
-        for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
-            myPartitions.add(allPartitions.get(i));
+        Set<TopicPartition> myPartitions = new HashSet<>(allPartitionsSorted.size() / totalTaskCount + 1);
+        for (int i = thisTaskIndex; i < allPartitionsSorted.size(); i += totalTaskCount) {
+            myPartitions.add(allPartitionsSorted.get(i));
         }
-        return new ArrayList<>(myPartitions);
+        return myPartitions;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
deleted file mode 100644
index 6fd8c2b..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/Subscription.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.subscription;
-
-import java.io.Serializable;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.task.TopologyContext;
-
-/**
- * A subscription to kafka.
- */
-public abstract class Subscription implements Serializable {
-    private static final long serialVersionUID = -216136367240198716L;
-
-    /**
-     * Subscribe the KafkaConsumer to the proper topics.
-     * Implementations must ensure that a given topic partition is always assigned to the same spout task.
-     * Adding and removing partitions as necessary is fine, but partitions must not move from one task to another.
-     * This constraint is only important for use with the Trident spout.
-     * @param consumer the Consumer to get.
-     * @param listener the rebalance listener to include in the subscription
-     */
-    public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
-    
-    /**
-     * Get the topics string.
-     * @return A human-readable string representing the subscribed topics.
-     */
-    public abstract String getTopicsString();
-    
-    /**
-     * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
-     * If you wish to do manual partition management, you must provide an implementation of this method
-     * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
-     * to inform the rest of the system of those changes.
-     */
-    public void refreshAssignment() {
-        //NOOP
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
new file mode 100644
index 0000000..dcc93ce
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.subscription;
+
+import java.io.Serializable;
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Handles assigning partitions to the consumer and updating the rebalance listener.
+ */
+public class TopicAssigner implements Serializable {
+
+    private static final long serialVersionUID = 5423018073527583826L;
+    
+    /**
+     * Assign partitions to the KafkaConsumer.
+     * @param <K> The consumer key type
+     * @param <V> The consumer value type
+     * @param consumer The Kafka consumer to assign partitions to
+     * @param newAssignment The partitions to assign.
+     * @param listener The rebalance listener to call back on when the assignment changes
+     */
+    public <K, V> void assignPartitions(KafkaConsumer<K, V> consumer, Set<TopicPartition> newAssignment,
+        ConsumerRebalanceListener listener) {
+        Set<TopicPartition> currentAssignment = consumer.assignment();
+        if (!newAssignment.equals(currentAssignment)) {
+            listener.onPartitionsRevoked(currentAssignment);
+            consumer.assign(newAssignment);
+            listener.onPartitionsAssigned(newAssignment);
+        }
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
index 6af516f..ae2c254 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
@@ -17,18 +17,18 @@
 package org.apache.storm.kafka.spout.subscription;
 
 import java.io.Serializable;
-import java.util.List;
+import java.util.Set;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
 public interface TopicFilter extends Serializable {
     
     /**
-     * Get the Kafka TopicPartitions passed by this filter. 
+     * Get the Kafka TopicPartitions subscribed to by this set of spouts. 
      * @param consumer The Kafka consumer to use to read the list of existing partitions
-     * @return The Kafka partitions passed by this filter.
+     * @return The Kafka partitions this set of spouts should subscribe to
      */
-    List<TopicPartition> getFilteredTopicPartitions(KafkaConsumer<?, ?> consumer);
+    Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer);
     
     /**
      * Get the topics string.

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index 9ba76d7..6e56bb5 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -24,8 +24,6 @@ import java.util.List;
 import java.util.Map;
 import org.apache.commons.lang.Validate;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -33,45 +31,39 @@ import org.slf4j.LoggerFactory;
  * Wraps transaction batch information.
  */
 public class KafkaTridentSpoutBatchMetadata implements Serializable {
+
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
     private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
-    
-    public static final String TOPIC_PARTITION_KEY = "tp";
+
     public static final String FIRST_OFFSET_KEY = "firstOffset";
     public static final String LAST_OFFSET_KEY = "lastOffset";
-    
-    // topic partition of this batch
-    private final TopicPartition topicPartition;  
+
     // first offset of this batch
-    private final long firstOffset;               
+    private final long firstOffset;
     // last offset of this batch
     private final long lastOffset;
 
     /**
      * Builds a metadata object.
-     * @param topicPartition The topic partition
+     *
      * @param firstOffset The first offset for the batch
      * @param lastOffset The last offset for the batch
      */
-    public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
-        this.topicPartition = topicPartition;
+    public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset) {
         this.firstOffset = firstOffset;
         this.lastOffset = lastOffset;
     }
 
     /**
      * Builds a metadata object from a non-empty set of records.
-     * @param topicPartition The topic partition the records belong to.
+     *
      * @param consumerRecords The non-empty set of records.
      */
-    public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
-        Validate.notNull(consumerRecords.records(topicPartition));
-        List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
-        Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata");
-        
-        this.topicPartition = topicPartition;
-        firstOffset = records.get(0).offset();
-        lastOffset = records.get(records.size() - 1).offset();
+    public <K, V> KafkaTridentSpoutBatchMetadata(List<ConsumerRecord<K, V>> consumerRecords) {
+        Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata");
+
+        firstOffset = consumerRecords.get(0).offset();
+        lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset();
         LOG.debug("Created {}", this.toString());
     }
 
@@ -83,28 +75,22 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
         return lastOffset;
     }
 
-    public TopicPartition getTopicPartition() {
-        return topicPartition;
-    }
-    
     /**
      * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
+     *
      * @param map The source map
      * @return A new metadata object
      */
     public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
-        Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY);
-        TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
-        return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
-            ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+        return new KafkaTridentSpoutBatchMetadata(((Number) map.get(FIRST_OFFSET_KEY)).longValue(),
+            ((Number) map.get(LAST_OFFSET_KEY)).longValue());
     }
-    
+
     /**
      * Writes this metadata object to a Map so Trident can read/write it to Zookeeper.
      */
     public Map<String, Object> toMap() {
         Map<String, Object> map = new HashMap<>();
-        map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
         map.put(FIRST_OFFSET_KEY, firstOffset);
         map.put(LAST_OFFSET_KEY, lastOffset);
         return map;
@@ -112,10 +98,9 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
 
     @Override
     public final String toString() {
-        return super.toString()
-                + "{topicPartition=" + topicPartition
-                + ", firstOffset=" + firstOffset
-                + ", lastOffset=" + lastOffset
-                + '}';
+        return "KafkaTridentSpoutBatchMetadata{"
+            + "firstOffset=" + firstOffset
+            + ", lastOffset=" + lastOffset
+            + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index a45eff8..86535be 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -32,15 +32,18 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
-import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.kafka.spout.RecordTranslator;
-import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.kafka.spout.TopicPartitionComparator;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
@@ -59,49 +62,39 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
 
     // Kafka
     private final KafkaConsumer<K, V> kafkaConsumer;
-
-    // Bookkeeping
-    private final KafkaTridentSpoutManager<K, V> kafkaManager;
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final TopicAssigner topicAssigner;
+    
     // set of topic-partitions for which first poll has already occurred, and the first polled txid
     private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 
 
-    // Declare some KafkaTridentSpoutManager references for convenience
     private final long pollTimeoutMs;
     private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
     private final RecordTranslator<K, V> translator;
-    private final Timer refreshSubscriptionTimer;
     private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
-
-    private TopologyContext topologyContext;
+    private final TopologyContext topologyContext;
 
     /**
      * Create a new Kafka spout emitter.
-     * @param kafkaManager The Kafka consumer manager to use
+     * @param kafkaSpoutConfig The kafka spout config
      * @param topologyContext The topology context
-     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
      */
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext,
-            Timer refreshSubscriptionTimer) {
-        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
-        this.kafkaManager = kafkaManager;
+    public KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext) {
+        this(kafkaSpoutConfig, topologyContext, new KafkaConsumerFactoryDefault<>(), new TopicAssigner());
+    }
+    
+    KafkaTridentSpoutEmitter(KafkaSpoutConfig<K, V> kafkaSpoutConfig, TopologyContext topologyContext,
+        KafkaConsumerFactory<K, V> consumerFactory, TopicAssigner topicAssigner) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
+        this.kafkaConsumer = consumerFactory.createConsumer(kafkaSpoutConfig);
         this.topologyContext = topologyContext;
-        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
-        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
-
-        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
+        this.translator = kafkaSpoutConfig.getTranslator();
+        this.topicAssigner = topicAssigner;
         this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
         this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
         LOG.debug("Created {}", this.toString());
     }
 
-    /**
-     * Creates instance of this class with default 500 millisecond refresh subscription timer.
-     */
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
-        this(kafkaManager, topologyContext, new Timer(500,
-                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
-    }
-
     @Override
     public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
             KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
@@ -115,11 +108,10 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
         KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
         Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
 
-        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
-            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], "
-                            + "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] "
-                            + "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
-                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+        if (!assignments.contains(currBatchPartition.getTopicPartition())) {
+            throw new IllegalStateException("The spout is asked to emit tuples on a partition it is not assigned."
+                + " This indicates a bug in the TopicFilter or ManualPartitioner implementations."
+                + " The current partition is [" + currBatchPartition + "], the assigned partitions are [" + assignments + "].");
         } else {
             try {
                 // pause other topic-partitions to only poll from current topic-partition
@@ -127,18 +119,13 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
 
                 seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
 
-                // poll
-                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
-                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
-                }
-
                 final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
                 LOG.debug("Polled [{}] records from Kafka.", records.count());
 
                 if (!records.isEmpty()) {
                     emitTuples(collector, records);
                     // build new metadata
-                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
+                    currentBatch = new KafkaTridentSpoutBatchMetadata(records.records(currBatchTp));
                 }
             } finally {
                 kafkaConsumer.resume(pausedTopicPartitions);
@@ -196,7 +183,7 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
         }
 
         final long fetchOffset = kafkaConsumer.position(tp);
-        LOG.debug("Set [fetchOffset = {}]", fetchOffset);
+        LOG.debug("Set [fetchOffset = {}] for partition [{}]", fetchOffset, tp);
         return fetchOffset;
     }
 
@@ -216,24 +203,12 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     }
 
     @Override
-    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
-        LOG.trace("Refreshing of topic-partitions handled by Kafka. "
-                + "No action taken by this method for topic partitions {}", partitionResponsibilities);
-    }
-
-    /**
-     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
-     * for this task must be assigned to the Kafka consumer running on this task.
-     *
-     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
-     * @return ordered list of topic partitions for this task
-     */
-    @Override
     public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
-        List<TopicPartition> allTopicPartitions = allPartitionInfo.stream()
+        List<TopicPartition> sortedPartitions = allPartitionInfo.stream()
             .map(map -> tpSerializer.fromMap(map))
+            .sorted(TopicPartitionComparator.INSTANCE)
             .collect(Collectors.toList());
-        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
+        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(sortedPartitions);
         LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
                 allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
         return allPartitions;
@@ -241,21 +216,31 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
 
     @Override
     public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
-        List<Map<String, Object>> allPartitionInfo) {
-        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
+        List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) {
+        List<TopicPartition> tps = allPartitionInfoSorted.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toList());
+        final Set<TopicPartition> assignedTps = kafkaSpoutConfig.getTopicPartitioner().getPartitionsForThisTask(tps, topologyContext);
         LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
         final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
-        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
         return taskTps;
     }
+    
+    
+    @Override
+    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
+        Set<TopicPartition> assignedTps = partitionResponsibilities.stream()
+            .map(kttp -> kttp.getTopicPartition())
+            .collect(Collectors.toSet());
+        topicAssigner.assignPartitions(kafkaConsumer, assignedTps, new KafkaSpoutConsumerRebalanceListener());
+        LOG.debug("Assigned partitions [{}] to this task", assignedTps);
+    }
 
     private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
-        final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size());
-        if (tps != null) {
-            for (TopicPartition tp : tps) {
-                LOG.trace("Added topic-partition [{}]", tp);
-                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
-            }
+        final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps.size());
+        for (TopicPartition tp : tps) {
+            LOG.trace("Added topic-partition [{}]", tp);
+            kttp.add(new KafkaTridentSpoutTopicPartition(tp));
         }
         return kttp;
     }
@@ -273,7 +258,24 @@ public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTrident
     @Override
     public final String toString() {
         return super.toString()
-                + "{kafkaManager=" + kafkaManager
+                + "{kafkaSpoutConfig=" + kafkaSpoutConfig
                 + '}';
     }
+    
+    /**
+     * Just logs reassignments.
+     */
+    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+        }
+    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
deleted file mode 100644
index b5138c2..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ /dev/null
@@ -1,116 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.trident;
-
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.Set;
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.RecordTranslator;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.tuple.Fields;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class KafkaTridentSpoutManager<K, V> implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
-
-    // Kafka
-    private transient KafkaConsumer<K, V> kafkaConsumer;
-
-    // Bookkeeping
-    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    // Declare some KafkaSpoutConfig references for convenience
-    private Fields fields;
-
-    /**
-     * Create a KafkaConsumer manager for the trident spout.
-     * @param kafkaSpoutConfig The consumer config
-     */
-    public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this.kafkaSpoutConfig = kafkaSpoutConfig;
-        this.fields = getFields();
-        LOG.debug("Created {}", this.toString());
-    }
-
-    KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
-        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
-
-        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
-        return kafkaConsumer;
-    }
-
-    KafkaConsumer<K, V> getKafkaConsumer() {
-        return kafkaConsumer;
-    }
-
-    Set<TopicPartition> getTopicPartitions() {
-        return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
-    }
-
-    final Fields getFields() {
-        if (fields == null) {
-            RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
-            Fields fs = null;
-            for (String stream : translator.streams()) {
-                if (fs == null) {
-                    fs = translator.getFieldsFor(stream);
-                } else {
-                    if (!fs.equals(translator.getFieldsFor(stream))) {
-                        throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
-                    }
-                }
-            }
-            fields = fs;
-        }
-        LOG.debug("OutputFields = {}", fields);
-        return fields;
-    }
-
-    KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
-        return kafkaSpoutConfig;
-    }
-
-    @Override
-    public final String toString() {
-        return super.toString()
-                + "{kafkaConsumer=" + kafkaConsumer
-                + ", kafkaSpoutConfig=" + kafkaSpoutConfig
-                + '}';
-    }
-
-    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
-        @Override
-        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
-            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
-        }
-
-        @Override
-        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
-            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
-            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
-                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index 8d33e39..3257be7 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -20,8 +20,8 @@ package org.apache.storm.kafka.spout.trident;
 
 import java.util.List;
 import java.util.Map;
-import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.tuple.Fields;
@@ -34,26 +34,22 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
 
-    private final KafkaTridentSpoutManager<K, V> kafkaManager;
-
-    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
-        this(new KafkaTridentSpoutManager<>(conf));
-    }
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
     
-    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
-        this.kafkaManager = kafkaManager;
+    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
         LOG.debug("Created {}", this.toString());
     }
 
     @Override
     public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
             Map<String, Object> conf, TopologyContext context) {
-        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
+        return new KafkaTridentSpoutEmitter<>(kafkaSpoutConfig, context);
     }
 
     @Override
     public Coordinator<List<Map<String, Object>>> getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
+        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaSpoutConfig);
     }
 
     @Override
@@ -63,7 +59,13 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     @Override
     public Fields getOutputFields() {
-        final Fields outputFields = kafkaManager.getFields();
+        RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+        int numStreams = translator.streams().size();
+        if (numStreams > 1) {
+            throw new IllegalStateException("Trident spouts must have at most one output stream,"
+                + " found streams [" + translator.streams() + "]");
+        }
+        final Fields outputFields = translator.getFieldsFor(translator.streams().get(0));
         LOG.debug("OutputFields = {}", outputFields);
         return outputFields;
     }
@@ -71,6 +73,6 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
     @Override
     public final String toString() {
         return super.toString()
-                + "{kafkaManager=" + kafkaManager + '}';
+                + "{kafkaSpoutConfig=" + kafkaSpoutConfig + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
index 17732c2..3aa3a99 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -19,24 +19,46 @@
 package org.apache.storm.kafka.spout.trident;
 
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
+import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
         Serializable {
+    //Initial delay for the assignment refresh timer
+    public static final long TIMER_DELAY_MS = 500;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
 
     private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
-    private final KafkaTridentSpoutManager<K,V> kafkaManager;
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final Timer refreshAssignmentTimer;
+    private final KafkaConsumer<K, V> consumer;
+    
+    private Set<TopicPartition> partitionsForBatch;
 
-    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
-        this.kafkaManager = kafkaManager;
+    /**
+     * Creates a new coordinator based on the given spout config.
+     * @param kafkaSpoutConfig The spout config to use
+     */
+    public KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>());
+    }
+    
+    KafkaTridentSpoutOpaqueCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> consumerFactory) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
+        this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
         LOG.debug("Created {}", this.toString());
     }
 
@@ -48,22 +70,25 @@ public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartition
 
     @Override
     public List<Map<String, Object>> getPartitionsForBatch() {
-        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
-        LOG.debug("TopicPartitions for batch {}", topicPartitions);
-        return topicPartitions.stream()
+        if (refreshAssignmentTimer.isExpiredResetOnTrue() || partitionsForBatch == null) {
+            partitionsForBatch = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer);
+        }
+        LOG.debug("TopicPartitions for batch {}", partitionsForBatch);
+        return partitionsForBatch.stream()
             .map(tp -> tpSerializer.toMap(tp))
             .collect(Collectors.toList());
     }
 
     @Override
     public void close() {
-        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
+        this.consumer.close();
+        LOG.debug("Closed");
     }
 
     @Override
     public final String toString() {
         return super.toString()
-                + "{kafkaManager=" + kafkaManager
+                + "{kafkaSpoutConfig=" + kafkaSpoutConfig
                 + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
index fd14baf..f064b81 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -26,7 +26,7 @@ import org.apache.storm.trident.spout.ISpoutPartition;
  * {@link ISpoutPartition} that wraps {@link TopicPartition} information.
  */
 public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Serializable {
-    private TopicPartition topicPartition;
+    private final TopicPartition topicPartition;
 
     public KafkaTridentSpoutTopicPartition(String topic, int partition) {
         this(new TopicPartition(topic, partition));
@@ -56,12 +56,12 @@ public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Seriali
 
         KafkaTridentSpoutTopicPartition that = (KafkaTridentSpoutTopicPartition) o;
 
-        return topicPartition != null ? topicPartition.equals(that.topicPartition) : that.topicPartition == null;
+        return topicPartition.equals(that.topicPartition);
     }
 
     @Override
     public int hashCode() {
-        return topicPartition != null ? topicPartition.hashCode() : 0;
+        return topicPartition.hashCode();
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
deleted file mode 100644
index 24bc1e2..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.trident;
-
-import java.util.Collection;
-import java.util.Collections;
-import java.util.LinkedHashSet;
-import java.util.Set;
-import org.apache.kafka.common.TopicPartition;
-
-public enum KafkaTridentSpoutTopicPartitionRegistry {
-    INSTANCE;
-
-    private Set<TopicPartition> topicPartitions;
-
-    KafkaTridentSpoutTopicPartitionRegistry() {
-        this.topicPartitions = new LinkedHashSet<>();
-    }
-
-    public Set<TopicPartition> getTopicPartitions() {
-        return Collections.unmodifiableSet(topicPartitions);
-    }
-
-    public void addAll(Collection<? extends TopicPartition> topicPartitions) {
-        this.topicPartitions.addAll(topicPartitions);
-    }
-
-    public void removeAll(Collection<? extends TopicPartition> topicPartitions) {
-        this.topicPartitions.removeAll(topicPartitions);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
deleted file mode 100644
index e41f95d..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.trident;
-
-import java.util.Map;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.trident.spout.IPartitionedTridentSpout;
-import org.apache.storm.trident.spout.ISpoutPartition;
-import org.apache.storm.tuple.Fields;
-
-// TODO
-public class KafkaTridentSpoutTransactional<PartitionsT, P extends ISpoutPartition, T> 
-        implements IPartitionedTridentSpout<PartitionsT, P, T> {
-    @Override
-    public Coordinator<PartitionsT> getCoordinator(Map<String, Object> conf, TopologyContext context) {
-        return null;
-    }
-
-    @Override
-    public Emitter<PartitionsT, P, T> getEmitter(Map<String, Object> conf, TopologyContext context) {
-        return null;
-    }
-
-    @Override
-    public Map<String, Object> getComponentConfiguration() {
-        return null;
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
index dd29696..a7b3e09 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutAbstractTest.java
@@ -34,7 +34,6 @@ import org.junit.Before;
 import org.junit.Rule;
 import org.mockito.ArgumentCaptor;
 import org.mockito.Captor;
-import org.mockito.MockitoAnnotations;
 
 import java.util.HashMap;
 import java.util.Map;
@@ -47,7 +46,15 @@ import static org.mockito.Mockito.reset;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.verify;
 
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
+import org.mockito.junit.MockitoJUnit;
+import org.mockito.junit.MockitoRule;
+
 public abstract class KafkaSpoutAbstractTest {
+    
+    @Rule
+    public MockitoRule mockito = MockitoJUnit.rule();
+    
     @Rule
     public KafkaUnitRule kafkaUnitRule = new KafkaUnitRule();
 
@@ -74,13 +81,11 @@ public abstract class KafkaSpoutAbstractTest {
 
     @Before
     public void setUp() {
-        MockitoAnnotations.initMocks(this);
-
         spoutConfig = createSpoutConfig();
 
         consumerSpy = createConsumerSpy();
 
-        spout = new KafkaSpout<>(spoutConfig, createConsumerFactory());
+        spout = new KafkaSpout<>(spoutConfig, createConsumerFactory(), new TopicAssigner());
 
         simulatedTime = new Time.SimulatedTime();
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
index 317723d..01e2e9f 100755
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutEmitTest.java
@@ -52,8 +52,11 @@ import static org.mockito.ArgumentMatchers.anyLong;
 import static org.mockito.ArgumentMatchers.anyObject;
 import static org.mockito.ArgumentMatchers.anyString;
 import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.mock;
 
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.kafka.clients.consumer.internals.PartitionAssignor.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 
 public class KafkaSpoutEmitTest {
 
@@ -67,7 +70,7 @@ public class KafkaSpoutEmitTest {
 
     @Before
     public void setUp() {
-        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
index f1acafa..01ee9e4 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutLogCompactionSupportTest.java
@@ -52,7 +52,8 @@ import static org.mockito.Mockito.times;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 
 public class KafkaSpoutLogCompactionSupportTest {
 
@@ -70,7 +71,7 @@ public class KafkaSpoutLogCompactionSupportTest {
     @Before
     public void setUp() {
         MockitoAnnotations.initMocks(this);
-        spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
             .build();
         consumerMock = mock(KafkaConsumer.class);

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
index a9e7c6c..920dedc 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutMessagingGuaranteeTest.java
@@ -44,7 +44,8 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
-import org.apache.storm.kafka.spout.subscription.Subscription;
+import org.apache.storm.kafka.spout.subscription.ManualPartitioner;
+import org.apache.storm.kafka.spout.subscription.TopicFilter;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -77,7 +78,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testAtMostOnceModeCommitsBeforeEmit() throws Exception {
         //At-most-once mode must commit tuples before they are emitted to the topology to ensure that a spout crash won't cause replays.
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
             .build();
         KafkaSpout<String, String> spout = SpoutWithMockedConsumerSetupHelper.setupSpout(spoutConfig, conf, contextMock, collectorMock, consumerMock, partition);
@@ -86,7 +87,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
             SpoutWithMockedConsumerSetupHelper.createRecords(partition, 0, 1))));
 
         spout.nextTuple();
-        
+
         when(consumerMock.position(partition)).thenReturn(1L);
 
         //The spout should have emitted the tuple, and must have committed it before emit
@@ -121,7 +122,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testAtMostOnceModeDisregardsMaxUncommittedOffsets() throws Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
@@ -130,7 +131,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testNoGuaranteeModeDisregardsMaxUncommittedOffsets() throws Exception {
         //The maxUncommittedOffsets limit should not be enforced, since it is only meaningful in at-least-once mode
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .build();
         doTestModeDisregardsMaxUncommittedOffsets(spoutConfig);
@@ -164,7 +165,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testAtMostOnceModeCannotReplayTuples() throws Exception {
         //When tuple tracking is enabled, the spout must not replay tuples in at-most-once mode
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
             .setTupleTrackingEnforced(true)
             .build();
@@ -174,7 +175,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testNoGuaranteeModeCannotReplayTuples() throws Exception {
         //When tuple tracking is enabled, the spout must not replay tuples in no guarantee mode
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();
@@ -184,7 +185,7 @@ public class KafkaSpoutMessagingGuaranteeTest {
     @Test
     public void testAtMostOnceModeDoesNotCommitAckedTuples() throws Exception {
         //When tuple tracking is enabled, the spout must not commit acked tuples in at-most-once mode because they were committed before being emitted
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE)
             .setTupleTrackingEnforced(true)
             .build();
@@ -214,11 +215,11 @@ public class KafkaSpoutMessagingGuaranteeTest {
             }));
         }
     }
-    
+
     @Test
     public void testNoGuaranteeModeCommitsPolledTuples() throws Exception {
         //When using the no guarantee mode, the spout must commit tuples periodically, regardless of whether they've been acked
-        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(Subscription.class), -1)
+        KafkaSpoutConfig<String, String> spoutConfig = createKafkaSpoutConfigBuilder(mock(TopicFilter.class), mock(ManualPartitioner.class), -1)
             .setProcessingGuarantee(KafkaSpoutConfig.ProcessingGuarantee.NO_GUARANTEE)
             .setTupleTrackingEnforced(true)
             .build();

http://git-wip-us.apache.org/repos/asf/storm/blob/a20b2659/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
index 2273117..5b37a8d 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutReactivationTest.java
@@ -38,6 +38,7 @@ import org.apache.storm.kafka.KafkaUnitRule;
 import org.apache.storm.kafka.spout.config.builder.SingleTopicKafkaSpoutConfiguration;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.subscription.TopicAssigner;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.utils.Time;
@@ -84,7 +85,7 @@ public class KafkaSpoutReactivationTest {
         when(consumerFactoryMock.createConsumer(any()))
             .thenReturn(consumerSpy)
             .thenReturn(postReactivationConsumerSpy);
-        this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock);
+        this.spout = new KafkaSpout<>(spoutConfig, consumerFactoryMock, new TopicAssigner());
     }
 
     private void prepareSpout(int messageCount) throws Exception {


[3/3] storm git commit: Merge branch 'STORM-2691' of https://github.com/srdo/storm into STORM-2691-merge

Posted by ka...@apache.org.
Merge branch 'STORM-2691' of https://github.com/srdo/storm into STORM-2691-merge


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e8e1a4e8
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e8e1a4e8
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e8e1a4e8

Branch: refs/heads/master
Commit: e8e1a4e8f5cf77dce00970a1cdcf6dfd03222194
Parents: 93f9a17 a20b265
Author: Jungtaek Lim <ka...@gmail.com>
Authored: Thu Mar 1 19:35:51 2018 +0900
Committer: Jungtaek Lim <ka...@gmail.com>
Committed: Thu Mar 1 19:35:51 2018 +0900

----------------------------------------------------------------------
 .../apache/storm/kafka/spout/KafkaSpout.java    |  41 ++--
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  44 ++--
 .../ManualPartitionSubscription.java            |  69 ------
 .../spout/subscription/ManualPartitioner.java   |  13 +-
 .../spout/subscription/NamedTopicFilter.java    |   5 +-
 .../spout/subscription/PatternTopicFilter.java  |   6 +-
 .../RoundRobinManualPartitioner.java            |  11 +-
 .../kafka/spout/subscription/Subscription.java  |  57 -----
 .../kafka/spout/subscription/TopicAssigner.java |  51 +++++
 .../kafka/spout/subscription/TopicFilter.java   |   8 +-
 .../trident/KafkaTridentSpoutBatchMetadata.java |  55 ++---
 .../spout/trident/KafkaTridentSpoutEmitter.java | 130 +++++------
 .../spout/trident/KafkaTridentSpoutManager.java | 116 ----------
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  26 +--
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  43 +++-
 .../KafkaTridentSpoutTopicPartition.java        |   6 +-
 ...KafkaTridentSpoutTopicPartitionRegistry.java |  47 ----
 .../trident/KafkaTridentSpoutTransactional.java |  49 -----
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  13 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |   7 +-
 .../KafkaSpoutLogCompactionSupportTest.java     |   5 +-
 .../spout/KafkaSpoutMessagingGuaranteeTest.java |  21 +-
 .../kafka/spout/KafkaSpoutReactivationTest.java |   3 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  67 +++---
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  15 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   1 -
 .../SpoutWithMockedConsumerSetupHelper.java     |  25 ++-
 .../SingleTopicKafkaSpoutConfiguration.java     |   7 +-
 .../ManualPartitionSubscriptionTest.java        |  79 -------
 .../subscription/NamedTopicFilterTest.java      |   3 +-
 .../subscription/PatternTopicFilterTest.java    |   3 +-
 .../RoundRobinManualPartitionerTest.java        |  76 +++++++
 .../spout/subscription/TopicAssignerTest.java   |  68 ++++++
 .../KafkaTridentSpoutBatchMetadataTest.java     |  13 +-
 .../trident/KafkaTridentSpoutEmitterTest.java   | 214 +++++++++++++++++++
 .../KafkaTridentSpoutOpaqueCoordinatorTest.java | 111 ++++++++++
 pom.xml                                         |  12 +-
 .../spout/IOpaquePartitionedTridentSpout.java   |  21 +-
 .../OpaquePartitionedTridentSpoutExecutor.java  |  10 +-
 39 files changed, 864 insertions(+), 687 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e8e1a4e8/pom.xml
----------------------------------------------------------------------