You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/13 12:26:42 UTC

[2/3] storm git commit: STORM-2974: Add transactional spout to storm-kafka-client

STORM-2974: Add transactional spout to storm-kafka-client


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/ba526077
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/ba526077
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/ba526077

Branch: refs/heads/master
Commit: ba5260774bbd436e81da14a97c072e399a70a896
Parents: 2d7c7d3
Author: Stig Rohde Døssing <sr...@apache.org>
Authored: Fri Mar 9 20:57:53 2018 +0100
Committer: Stig Rohde Døssing <sr...@apache.org>
Committed: Tue Jul 10 11:27:47 2018 +0200

----------------------------------------------------------------------
 bin/storm.py                                    |   4 +-
 docs/Command-line-client.md                     |   2 +-
 docs/Transactional-topologies.md                |   6 +-
 docs/Trident-state.md                           |   4 +-
 docs/flux.md                                    |   2 +-
 .../TridentKafkaClientTopologyNamedTopics.java  |  16 +-
 .../kafka/spout/EmptyKafkaTupleListener.java    |   1 -
 .../apache/storm/kafka/spout/KafkaSpout.java    |  90 ++++----
 .../storm/kafka/spout/KafkaSpoutConfig.java     |   1 -
 .../storm/kafka/spout/KafkaTupleListener.java   |   1 -
 .../kafka/spout/internal/ConsumerFactory.java   |  28 +++
 .../spout/internal/ConsumerFactoryDefault.java  |  29 +++
 .../spout/internal/KafkaConsumerFactory.java    |  28 ---
 .../internal/KafkaConsumerFactoryDefault.java   |  29 ---
 .../kafka/spout/internal/OffsetManager.java     |   2 -
 .../kafka/spout/metrics/KafkaOffsetMetric.java  |  15 +-
 .../spout/subscription/NamedTopicFilter.java    |   4 +-
 .../spout/subscription/PatternTopicFilter.java  |   7 +-
 .../kafka/spout/subscription/TopicAssigner.java |   5 +-
 .../kafka/spout/subscription/TopicFilter.java   |   4 +-
 .../trident/KafkaTridentOpaqueSpoutEmitter.java |  68 ++++++
 .../trident/KafkaTridentSpoutBatchMetadata.java |  23 +-
 .../trident/KafkaTridentSpoutCoordinator.java   |  97 ++++++++
 .../spout/trident/KafkaTridentSpoutEmitter.java | 219 ++++++++++++-------
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  18 +-
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  94 --------
 .../trident/KafkaTridentSpoutTransactional.java |  65 ++++++
 .../KafkaTridentTransactionalSpoutEmitter.java  |  68 ++++++
 .../trident/internal/OutputFieldsExtractor.java |  41 ++++
 .../kafka/spout/KafkaSpoutAbstractTest.java     |  11 +-
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   |  10 +-
 .../KafkaSpoutLogCompactionSupportTest.java     |  39 ++--
 .../kafka/spout/KafkaSpoutReactivationTest.java |  12 +-
 .../kafka/spout/KafkaSpoutRebalanceTest.java    |  32 +--
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   |  19 +-
 .../kafka/spout/MaxUncommittedOffsetTest.java   |   7 +-
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |   4 +-
 .../SpoutWithMockedConsumerSetupHelper.java     |   4 +-
 .../KafkaTridentSpoutBatchMetadataTest.java     |  10 +-
 .../trident/KafkaTridentSpoutEmitterTest.java   | 147 +++++++++----
 .../KafkaTridentSpoutOpaqueCoordinatorTest.java |   9 +-
 41 files changed, 833 insertions(+), 442 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/bin/storm.py
----------------------------------------------------------------------
diff --git a/bin/storm.py b/bin/storm.py
index 23bab73..3767b45 100755
--- a/bin/storm.py
+++ b/bin/storm.py
@@ -361,7 +361,7 @@ def jar(jarfile, klass, *args):
     And when you want to ship maven artifacts and its transitive dependencies, you can pass them to --artifacts with comma-separated string.
     You can also exclude some dependencies like what you're doing in maven pom.
     Please add exclusion artifacts with '^' separated string after the artifact.
-    For example, --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" will load jedis and kafka artifact and all of transitive dependencies but exclude slf4j-log4j12 from kafka.
+    For example, -artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" will load jedis and kafka-clients artifact and all of transitive dependencies but exclude slf4j-api from kafka.
 
     When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string.
     Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters.
@@ -373,7 +373,7 @@ def jar(jarfile, klass, *args):
     --proxyUsername: username of proxy if it requires basic auth
     --proxyPassword: password of proxy if it requires basic auth
 
-    Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
+    Complete example of options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
 
     When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/docs/Command-line-client.md
----------------------------------------------------------------------
diff --git a/docs/Command-line-client.md b/docs/Command-line-client.md
index 7bc678d..2348e3f 100644
--- a/docs/Command-line-client.md
+++ b/docs/Command-line-client.md
@@ -50,7 +50,7 @@ And when you want to ship maven artifacts and its transitive dependencies, you c
 
 When you need to pull the artifacts from other than Maven Central, you can pass remote repositories to --artifactRepositories option with comma-separated string. Repository format is "<name>^<url>". '^' is taken as separator because URL allows various characters. For example, --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/" will add JBoss and HDP repositories for dependency resolver.
 
-Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka/storm-kafka-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka_2.10:0.8.2.2^org.slf4j:slf4j-log4j12" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
+Complete example of both options is here: `./bin/storm jar example/storm-starter/storm-starter-topologies-*.jar org.apache.storm.starter.RollingTopWords blobstore-remote2 remote --jars "./external/storm-redis/storm-redis-1.1.0.jar,./external/storm-kafka-client/storm-kafka-client-1.1.0.jar" --artifacts "redis.clients:jedis:2.9.0,org.apache.kafka:kafka-clients:1.0.0^org.slf4j:slf4j-api" --artifactRepositories "jboss-repository^http://repository.jboss.com/maven2,HDPRepo^http://repo.hortonworks.com/content/groups/public/"`
 
 When you pass jars and/or artifacts options, StormSubmitter will upload them when the topology is submitted, and they will be included to classpath of both the process which runs the class, and also workers for that topology.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/docs/Transactional-topologies.md
----------------------------------------------------------------------
diff --git a/docs/Transactional-topologies.md b/docs/Transactional-topologies.md
index db5509f..612ac32 100644
--- a/docs/Transactional-topologies.md
+++ b/docs/Transactional-topologies.md
@@ -77,7 +77,7 @@ When using transactional topologies, Storm does the following for you:
 3. *Fault detection:* Storm leverages the acking framework to efficiently determine when a batch has successfully processed, successfully committed, or failed. Storm will then replay batches appropriately. You don't have to do any acking or anchoring -- Storm manages all of this for you.
 4. *First class batch processing API*: Storm layers an API on top of regular bolts to allow for batch processing of tuples. Storm manages all the coordination for determining when a task has received all the tuples for that particular transaction. Storm will also take care of cleaning up any accumulated state for each transaction (like the partial counts).
 
-Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka](https://github.com/apache/storm/tree/master/external/storm-kafka) contains a transactional spout implementation for Kafka.
+Finally, another thing to note is that transactional topologies require a source queue that can replay an exact batch of messages. Technologies like [Kestrel](https://github.com/robey/kestrel) can't do this. [Apache Kafka](http://incubator.apache.org/kafka/index.html) is a perfect fit for this kind of spout, and [storm-kafka-client](https://github.com/apache/storm/tree/master/external/storm-kafka-client) contains a transactional spout implementation for Kafka.
 
 ## The basics through example
 
@@ -255,7 +255,7 @@ The details of implementing a `TransactionalSpout` are in [the Javadoc](javadocs
 
 #### Partitioned Transactional Spout
 
-A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [TransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/TransactionalKafkaSpout.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details.
+A common kind of transactional spout is one that reads the batches from a set of partitions across many queue brokers. For example, this is how [KafkaTridentSpoutTransactional]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java) works. An `IPartitionedTransactionalSpout` automates the bookkeeping work of managing the state for each partition to ensure idempotent replayability. See [the Javadoc](javadocs/org/apache/storm/transactional/partitioned/IPartitionedTransactionalSpout.html) for more details.
 
 ### Configuration
 
@@ -325,7 +325,7 @@ In this scenario, tuples 41-50 are skipped. By failing all subsequent transactio
 
 By failing all subsequent transactions on failure, no tuples are skipped. This also shows that a requirement of transactional spouts is that they always emit where the last transaction left off.
 
-A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [OpaqueTransactionalKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/OpaqueTransactionalKafkaSpout.java) is an example. `OpaqueTransactionalKafkaSpout` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section.
+A non-idempotent transactional spout is more concisely referred to as an "OpaqueTransactionalSpout" (opaque is the opposite of idempotent). [IOpaquePartitionedTransactionalSpout](javadocs/org/apache/storm/transactional/partitioned/IOpaquePartitionedTransactionalSpout.html) is an interface for implementing opaque partitioned transactional spouts, of which [KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Trident/KafkaTridentSpoutOpaque.java) is an example. `KafkaTridentSpoutOpaque` can withstand losing individual Kafka nodes without sacrificing accuracy as long as you use the update strategy as explained in this section.
 
 ## Implementation
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/docs/Trident-state.md
----------------------------------------------------------------------
diff --git a/docs/Trident-state.md b/docs/Trident-state.md
index a89dc3c..ead8d86 100644
--- a/docs/Trident-state.md
+++ b/docs/Trident-state.md
@@ -28,7 +28,7 @@ Remember, Trident processes tuples as small batches with each batch being given
 2. There's no overlap between batches of tuples (tuples are in one batch or another, never multiple).
 3. Every tuple is in a batch (no tuples are skipped)
 
-This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. storm-contrib has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TransactionalTridentKafkaSpout.java) for Kafka.
+This is a pretty easy type of spout to understand, the stream is divided into fixed batches that never change. Storm has [an implementation of a transactional spout]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional) for Kafka.
 
 You might be wondering – why wouldn't you just always use a transactional spout? They're simple and easy to understand. One reason you might not use one is because they're not necessarily very fault-tolerant. For example, the way TransactionalTridentKafkaSpout works is the batch for a txid will contain tuples from all the Kafka partitions for a topic. Once a batch has been emitted, any time that batch is re-emitted in the future the exact same set of tuples must be emitted to meet the semantics of transactional spouts. Now suppose a batch is emitted from TransactionalTridentKafkaSpout, the batch fails to process, and at the same time one of the Kafka nodes goes down. You're now incapable of replaying the same batch as you did before (since the node is down and some partitions for the topic are not unavailable), and processing will halt. 
 
@@ -72,7 +72,7 @@ As described before, an opaque transactional spout cannot guarantee that the bat
 
 1. Every tuple is *successfully* processed in exactly one batch. However, it's possible for a tuple to fail to process in one batch and then succeed to process in a later batch.
 
-[OpaqueTridentKafkaSpout]({{page.git-tree-base}}/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/OpaqueTridentKafkaSpout.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for OpaqueTridentKafkaSpout to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.
+[KafkaTridentSpoutOpaque]({{page.git-tree-base}}/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java) is a spout that has this property and is fault-tolerant to losing Kafka nodes. Whenever it's time for KafkaTridentSpoutOpaque to emit a batch, it emits tuples starting from where the last batch finished emitting. This ensures that no tuple is ever skipped or successfully processed by multiple batches.
 
 With opaque transactional spouts, it's no longer possible to use the trick of skipping state updates if the transaction id in the database is the same as the transaction id for the current batch. This is because the batch may have changed between state updates.
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/docs/flux.md
----------------------------------------------------------------------
diff --git a/docs/flux.md b/docs/flux.md
index b1b359a..69cbb20 100644
--- a/docs/flux.md
+++ b/docs/flux.md
@@ -31,7 +31,7 @@ the layout and configuration of your topologies.
    in your topology code
  * Support for existing topology code (see below)
  * Define Storm Core API (Spouts/Bolts) using a flexible YAML DSL
- * YAML DSL support for most Storm components (storm-kafka, storm-hdfs, storm-hbase, etc.)
+ * YAML DSL support for most Storm components (storm-kafka-client, storm-hdfs, storm-hbase, etc.)
  * Convenient support for multi-lang components
  * External property substitution/filtering for easily switching between configurations/environments (similar to Maven-style
    `${variable.name}` substitution)

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
----------------------------------------------------------------------
diff --git a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
index 4f3dd38..6fecb5c 100644
--- a/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
+++ b/examples/storm-kafka-client-examples/src/main/java/org/apache/storm/kafka/trident/TridentKafkaClientTopologyNamedTopics.java
@@ -37,6 +37,8 @@ import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
 import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutOpaque;
+import org.apache.storm.kafka.spout.trident.KafkaTridentSpoutTransactional;
+import org.apache.storm.trident.spout.ITridentDataSource;
 import org.apache.storm.tuple.Fields;
 import org.apache.storm.tuple.Values;
 
@@ -53,6 +55,11 @@ public class TridentKafkaClientTopologyNamedTopics {
     private KafkaTridentSpoutOpaque<String, String> newKafkaTridentSpoutOpaque(KafkaSpoutConfig<String, String> spoutConfig) {
         return new KafkaTridentSpoutOpaque<>(spoutConfig);
     }
+    
+    private KafkaTridentSpoutTransactional<String, String> newKafkaTridentSpoutTransactional(
+        KafkaSpoutConfig<String, String> spoutConfig) {
+        return new KafkaTridentSpoutTransactional<>(spoutConfig);
+    }
 
     private static final Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new JustValueFunc();
 
@@ -66,7 +73,7 @@ public class TridentKafkaClientTopologyNamedTopics {
             return new Values(record.value());
         }
     }
-
+    
     protected KafkaSpoutConfig<String, String> newKafkaSpoutConfig(String bootstrapServers) {
         return KafkaSpoutConfig.builder(bootstrapServers, TOPIC_1, TOPIC_2)
             .setProp(ConsumerConfig.GROUP_ID_CONFIG, "kafkaSpoutTestGroup_" + System.nanoTime())
@@ -91,7 +98,8 @@ public class TridentKafkaClientTopologyNamedTopics {
     protected void run(String[] args) throws AlreadyAliveException, InvalidTopologyException,
         AuthorizationException, InterruptedException {
         final String brokerUrl = args.length > 0 ? args[0] : KAFKA_LOCAL_BROKER;
-        System.out.println("Running with broker url " + brokerUrl);
+        final boolean isOpaque = args.length > 1 ? Boolean.parseBoolean(args[1]) : true;
+        System.out.println("Running with broker url " + brokerUrl + " and isOpaque=" + isOpaque);
 
         Config tpConf = new Config();
         tpConf.setDebug(true);
@@ -101,7 +109,9 @@ public class TridentKafkaClientTopologyNamedTopics {
         StormSubmitter.submitTopology(TOPIC_1 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_1));
         StormSubmitter.submitTopology(TOPIC_2 + "-producer", tpConf, KafkaProducerTopology.newTopology(brokerUrl, TOPIC_2));
         // Consumer
+        KafkaSpoutConfig<String, String> spoutConfig = newKafkaSpoutConfig(brokerUrl);
+        ITridentDataSource spout = isOpaque ? newKafkaTridentSpoutOpaque(spoutConfig) : newKafkaTridentSpoutTransactional(spoutConfig);
         StormSubmitter.submitTopology("topics-consumer", tpConf,
-            TridentKafkaConsumerTopology.newTopology(newKafkaTridentSpoutOpaque(newKafkaSpoutConfig(brokerUrl))));
+            TridentKafkaConsumerTopology.newTopology(spout));
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
index 621fecd..a9a5cc5 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
@@ -22,7 +22,6 @@ package org.apache.storm.kafka.spout;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index f5969af..9aeba6b 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -36,8 +36,8 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.Collectors;
-
 import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -49,8 +49,8 @@ import org.apache.kafka.common.errors.RetriableException;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.ProcessingGuarantee;
 import org.apache.storm.kafka.spout.internal.CommitMetadataManager;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
-import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.ConsumerFactory;
+import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.kafka.spout.internal.Timer;
 import org.apache.storm.kafka.spout.metrics.KafkaOffsetMetric;
@@ -75,9 +75,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     // Kafka
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
-    private final KafkaConsumerFactory<K, V> kafkaConsumerFactory;
+    private final ConsumerFactory<K, V> kafkaConsumerFactory;
     private final TopicAssigner topicAssigner;
-    private transient KafkaConsumer<K, V> kafkaConsumer;
+    private transient Consumer<K, V> consumer;
 
     // Bookkeeping
     // Strategy to determine the fetch offset of the first realized by the spout upon activation
@@ -106,11 +106,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient KafkaSpoutConsumerRebalanceListener rebalanceListener;
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        this(kafkaSpoutConfig, new KafkaConsumerFactoryDefault<>(), new TopicAssigner());
+        this(kafkaSpoutConfig, new ConsumerFactoryDefault<>(), new TopicAssigner());
     }
 
     @VisibleForTesting
-    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
+    KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> kafkaConsumerFactory, TopicAssigner topicAssigner) {
         this.kafkaConsumerFactory = kafkaConsumerFactory;
         this.topicAssigner = topicAssigner;
         this.kafkaSpoutConfig = kafkaSpoutConfig;
@@ -144,7 +144,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         rebalanceListener = new KafkaSpoutConsumerRebalanceListener();
 
-        kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
+        consumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
 
         tupleListener.open(conf, context);
         if (canRegisterMetrics()) {
@@ -156,7 +156,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     private void registerMetric() {
         LOG.info("Registering Spout Metrics");
-        kafkaOffsetMetric = new KafkaOffsetMetric<>(() -> Collections.unmodifiableMap(offsetManagers), () -> kafkaConsumer);
+        kafkaOffsetMetric = new KafkaOffsetMetric(() -> Collections.unmodifiableMap(offsetManagers), () -> consumer);
         context.registerMetric("kafkaOffset", kafkaOffsetMetric, kafkaSpoutConfig.getMetricsTimeBucketSizeInSecs());
     }
 
@@ -184,7 +184,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             previousAssignment = partitions;
 
             LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
-                kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+                kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
 
             if (isAtLeastOnceProcessing()) {
                 commitOffsetsForAckedTuples();
@@ -194,7 +194,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         @Override
         public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
             LOG.info("Partitions reassignment. [task-ID={}, consumer-group={}, consumer={}, topic-partitions={}]",
-                context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+                context.getThisTaskId(), kafkaSpoutConfig.getConsumerGroupId(), consumer, partitions);
 
             initialize(partitions);
             tupleListener.onPartitionsReassigned(partitions);
@@ -217,7 +217,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             Set<TopicPartition> newPartitions = new HashSet<>(partitions);
             newPartitions.removeAll(previousAssignment);
             for (TopicPartition newTp : newPartitions) {
-                final OffsetAndMetadata committedOffset = kafkaConsumer.committed(newTp);
+                final OffsetAndMetadata committedOffset = consumer.committed(newTp);
                 final long fetchOffset = doSeek(newTp, committedOffset);
                 LOG.debug("Set consumer position to [{}] for topic-partition [{}] with [{}] and committed offset [{}]",
                     fetchOffset, newTp, firstPollOffsetStrategy, committedOffset);
@@ -242,29 +242,29 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     committedOffset,
                     Collections.unmodifiableMap(offsetManagers))) {
                     // Another KafkaSpout instance (of this topology) already committed, therefore FirstPollOffsetStrategy does not apply.
-                    kafkaConsumer.seek(newTp, committedOffset.offset());
+                    consumer.seek(newTp, committedOffset.offset());
                 } else {
                     // offset was not committed by this topology, therefore FirstPollOffsetStrategy applies
                     // (only when the topology is first deployed).
                     if (firstPollOffsetStrategy.equals(EARLIEST)) {
-                        kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
+                        consumer.seekToBeginning(Collections.singleton(newTp));
                     } else if (firstPollOffsetStrategy.equals(LATEST)) {
-                        kafkaConsumer.seekToEnd(Collections.singleton(newTp));
+                        consumer.seekToEnd(Collections.singleton(newTp));
                     } else {
                         // Resume polling at the last committed offset, i.e. the first offset that is not marked as processed.
-                        kafkaConsumer.seek(newTp, committedOffset.offset());
+                        consumer.seek(newTp, committedOffset.offset());
                     }
                 }
             } else {
                 // no offset commits have ever been done for this consumer group and topic-partition,
                 // so start at the beginning or end depending on FirstPollOffsetStrategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(Collections.singleton(newTp));
+                    consumer.seekToBeginning(Collections.singleton(newTp));
                 } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
-                    kafkaConsumer.seekToEnd(Collections.singleton(newTp));
+                    consumer.seekToEnd(Collections.singleton(newTp));
                 }
             }
-            return kafkaConsumer.position(newTp);
+            return consumer.position(newTp);
         }
     }
 
@@ -280,9 +280,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 if (isAtLeastOnceProcessing()) {
                     commitOffsetsForAckedTuples();
                 } else if (kafkaSpoutConfig.getProcessingGuarantee() == ProcessingGuarantee.NO_GUARANTEE) {
-                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
-                        createFetchedOffsetsMetadata(kafkaConsumer.assignment());
-                    kafkaConsumer.commitAsync(offsetsToCommit, null);
+                    Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+                        createFetchedOffsetsMetadata(consumer.assignment());
+                    consumer.commitAsync(offsetsToCommit, null);
                     LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
                 }
             }
@@ -314,7 +314,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             return new PollablePartitionsInfo(Collections.emptySet(), Collections.emptyMap());
         }
 
-        Set<TopicPartition> assignment = kafkaConsumer.assignment();
+        Set<TopicPartition> assignment = consumer.assignment();
         if (!isAtLeastOnceProcessing()) {
             return new PollablePartitionsInfo(assignment, Collections.emptyMap());
         }
@@ -357,32 +357,32 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // ======== poll =========
     private ConsumerRecords<K, V> pollKafkaBroker(PollablePartitionsInfo pollablePartitionsInfo) {
         doSeekRetriableTopicPartitions(pollablePartitionsInfo.pollableEarliestRetriableOffsets);
-        Set<TopicPartition> pausedPartitions = new HashSet<>(kafkaConsumer.assignment());
+        Set<TopicPartition> pausedPartitions = new HashSet<>(consumer.assignment());
         pausedPartitions.removeIf(pollablePartitionsInfo.pollablePartitions::contains);
         try {
-            kafkaConsumer.pause(pausedPartitions);
-            final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+            consumer.pause(pausedPartitions);
+            final ConsumerRecords<K, V> consumerRecords = consumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
             ackRetriableOffsetsIfCompactedAway(pollablePartitionsInfo.pollableEarliestRetriableOffsets, consumerRecords);
             final int numPolledRecords = consumerRecords.count();
             LOG.debug("Polled [{}] records from Kafka",
                 numPolledRecords);
             if (kafkaSpoutConfig.getProcessingGuarantee() == KafkaSpoutConfig.ProcessingGuarantee.AT_MOST_ONCE) {
                 //Commit polled records immediately to ensure delivery is at-most-once.
-                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = 
-                    createFetchedOffsetsMetadata(kafkaConsumer.assignment());
-                kafkaConsumer.commitSync(offsetsToCommit);
+                Map<TopicPartition, OffsetAndMetadata> offsetsToCommit =
+                    createFetchedOffsetsMetadata(consumer.assignment());
+                consumer.commitSync(offsetsToCommit);
                 LOG.debug("Committed offsets {} to Kafka", offsetsToCommit);
             }
             return consumerRecords;
         } finally {
-            kafkaConsumer.resume(pausedPartitions);
+            consumer.resume(pausedPartitions);
         }
     }
 
     private void doSeekRetriableTopicPartitions(Map<TopicPartition, Long> pollableEarliestRetriableOffsets) {
         for (Entry<TopicPartition, Long> retriableTopicPartitionAndOffset : pollableEarliestRetriableOffsets.entrySet()) {
             //Seek directly to the earliest retriable message for each retriable topic partition
-            kafkaConsumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
+            consumer.seek(retriableTopicPartitionAndOffset.getKey(), retriableTopicPartitionAndOffset.getValue());
         }
     }
 
@@ -471,8 +471,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     return true;
                 }
             } else {
-                /*if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately
-                * to allow its offset to be commited to Kafka*/
+                /*
+                 * if a null tuple is not configured to be emitted, it should be marked as emitted and acked immediately to allow its offset
+                 * to be commited to Kafka
+                 */
                 LOG.debug("Not emitting null tuple for record [{}] as defined in configuration.", record);
                 if (isAtLeastOnceProcessing()) {
                     msgId.setNullTuple(true);
@@ -494,11 +496,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private Map<TopicPartition, OffsetAndMetadata> createFetchedOffsetsMetadata(Set<TopicPartition> assignedPartitions) {
         Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>();
         for (TopicPartition tp : assignedPartitions) {
-            offsetsToCommit.put(tp, new OffsetAndMetadata(kafkaConsumer.position(tp), commitMetadataManager.getCommitMetadata()));
+            offsetsToCommit.put(tp, new OffsetAndMetadata(consumer.position(tp), commitMetadataManager.getCommitMetadata()));
         }
         return offsetsToCommit;
     }
-    
+
     private void commitOffsetsForAckedTuples() {
         final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
         for (Map.Entry<TopicPartition, OffsetManager> tpOffset : offsetManagers.entrySet()) {
@@ -510,14 +512,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         // Commit offsets that are ready to be committed for every topic partition
         if (!nextCommitOffsets.isEmpty()) {
-            kafkaConsumer.commitSync(nextCommitOffsets);
+            consumer.commitSync(nextCommitOffsets);
             LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
             // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
             // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
             for (Map.Entry<TopicPartition, OffsetAndMetadata> tpOffset : nextCommitOffsets.entrySet()) {
                 //Update the OffsetManager for each committed partition, and update numUncommittedOffsets
                 final TopicPartition tp = tpOffset.getKey();
-                long position = kafkaConsumer.position(tp);
+                long position = consumer.position(tp);
                 long committedOffset = tpOffset.getValue().offset();
                 if (position < committedOffset) {
                     /*
@@ -528,7 +530,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                      */
                     LOG.debug("Consumer fell behind committed offset. Catching up. Position was [{}], skipping to [{}]",
                         position, committedOffset);
-                    kafkaConsumer.seek(tp, committedOffset);
+                    consumer.seek(tp, committedOffset);
                     List<ConsumerRecord<K, V>> waitingToEmitForTp = waitingToEmit.get(tp);
                     if (waitingToEmitForTp != null) {
                         //Discard the pending records that are already committed
@@ -568,11 +570,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
         if (!emitted.contains(msgId)) {
             LOG.debug("Received ack for message [{}], associated with tuple emitted for a ConsumerRecord that "
-                + "came from a topic-partition that this consumer group instance is no longer tracking "
-                + "due to rebalance/partition reassignment. No action taken.", msgId);
+                        + "came from a topic-partition that this consumer group instance is no longer tracking "
+                        + "due to rebalance/partition reassignment. No action taken.", msgId);
         } else {
             Validate.isTrue(!retryService.isScheduled(msgId), "The message id " + msgId + " is queued for retry while being acked."
-                + " This should never occur barring errors in the RetryService implementation or the spout code.");
+                        + " This should never occur barring errors in the RetryService implementation or the spout code.");
             offsetManagers.get(msgId.getTopicPartition()).addToAckMsgs(msgId);
             emitted.remove(msgId);
         }
@@ -621,12 +623,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     }
 
     private void refreshAssignment() {
-        Set<TopicPartition> allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(kafkaConsumer);
+        Set<TopicPartition> allPartitions = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer);
         List<TopicPartition> allPartitionsSorted = new ArrayList<>(allPartitions);
         Collections.sort(allPartitionsSorted, TopicPartitionComparator.INSTANCE);
         Set<TopicPartition> assignedPartitions = kafkaSpoutConfig.getTopicPartitioner()
             .getPartitionsForThisTask(allPartitionsSorted, context);
-        topicAssigner.assignPartitions(kafkaConsumer, assignedPartitions, rebalanceListener);
+        topicAssigner.assignPartitions(consumer, assignedPartitions, rebalanceListener);
     }
 
     @Override
@@ -658,7 +660,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             commitIfNecessary();
         } finally {
             //remove resources
-            kafkaConsumer.close();
+            consumer.close();
         }
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 3b7be2b..b17a47c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -25,7 +25,6 @@ import java.util.Map;
 import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Pattern;
-
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.KafkaConsumer;

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
index 3f16220..1001483 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTupleListener.java
@@ -22,7 +22,6 @@ import java.io.Serializable;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.task.TopologyContext;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
new file mode 100644
index 0000000..5ca7080
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactory.java
@@ -0,0 +1,28 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import java.io.Serializable;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+/**
+ * This is here to enable testing.
+ */
+public interface ConsumerFactory<K, V> extends Serializable {
+    public Consumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
new file mode 100644
index 0000000..c384376
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/ConsumerFactoryDefault.java
@@ -0,0 +1,29 @@
+/*
+ * Copyright 2016 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.internal;
+
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+
+public class ConsumerFactoryDefault<K, V> implements ConsumerFactory<K, V> {
+
+    @Override
+    public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
deleted file mode 100644
index fb70927..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactory.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Copyright 2016 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal;
-
-import java.io.Serializable;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-
-/**
- * This is here to enable testing.
- */
-public interface KafkaConsumerFactory<K, V> extends Serializable {
-    public KafkaConsumer<K,V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
deleted file mode 100644
index 9a8142a..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/KafkaConsumerFactoryDefault.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Copyright 2016 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.internal;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-
-public class KafkaConsumerFactoryDefault<K, V> implements KafkaConsumerFactory<K, V> {
-
-    @Override
-    public KafkaConsumer<K, V> createConsumer(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
-        return new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
-    }
-    
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
index c9f9541..5bec5b8 100755
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/OffsetManager.java
@@ -17,13 +17,11 @@
 package org.apache.storm.kafka.spout.internal;
 
 import com.google.common.annotations.VisibleForTesting;
-
 import java.util.Comparator;
 import java.util.Iterator;
 import java.util.NavigableSet;
 import java.util.NoSuchElementException;
 import java.util.TreeSet;
-
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutMessageId;

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
index 26eb135..da84979 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -22,8 +22,7 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Set;
 import java.util.function.Supplier;
-
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.internal.OffsetManager;
 import org.apache.storm.metric.api.IMetric;
@@ -55,10 +54,10 @@ public class KafkaOffsetMetric<K, V> implements IMetric {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
     private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
-    private final Supplier<KafkaConsumer<K,V>> consumerSupplier;
+    private final Supplier<Consumer<K,V>> consumerSupplier;
 
     public KafkaOffsetMetric(Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier,
-        Supplier<KafkaConsumer<K, V>> consumerSupplier) {
+        Supplier<Consumer<K, V>> consumerSupplier) {
         this.offsetManagerSupplier = offsetManagerSupplier;
         this.consumerSupplier = consumerSupplier;
     }
@@ -67,9 +66,9 @@ public class KafkaOffsetMetric<K, V> implements IMetric {
     public Object getValueAndReset() {
 
         Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
-        KafkaConsumer<K,V> kafkaConsumer = consumerSupplier.get();
+        Consumer<K, V> consumer = consumerSupplier.get();
 
-        if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
+        if (offsetManagers == null || offsetManagers.isEmpty() || consumer == null) {
             LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
             return null;
         }
@@ -77,8 +76,8 @@ public class KafkaOffsetMetric<K, V> implements IMetric {
         Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
         Set<TopicPartition> topicPartitions = offsetManagers.keySet();
 
-        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
-        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
+        Map<TopicPartition, Long> beginningOffsets = consumer.beginningOffsets(topicPartitions);
+        Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
         //map to hold partition level and topic level metrics
         Map<String, Long> result = new HashMap<>();
 

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
index 1591265..87257d8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/NamedTopicFilter.java
@@ -21,7 +21,7 @@ import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
 import org.slf4j.Logger;
@@ -52,7 +52,7 @@ public class NamedTopicFilter implements TopicFilter {
     }
     
     @Override
-    public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
+    public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
         Set<TopicPartition> allPartitions = new HashSet<>();
         for (String topic : topics) {
             List<PartitionInfo> partitionInfoList = consumer.partitionsFor(topic);

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
index 554876f..9ba49dd 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/PatternTopicFilter.java
@@ -16,17 +16,14 @@
 
 package org.apache.storm.kafka.spout.subscription;
 
-import java.util.ArrayList;
-import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.regex.Pattern;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.TopicPartitionComparator;
 
 /**
  * Filter that returns all partitions for topics matching the given {@link Pattern}.
@@ -46,7 +43,7 @@ public class PatternTopicFilter implements TopicFilter {
     }
 
     @Override
-    public Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer) {
+    public Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer) {
         topics.clear();
         Set<TopicPartition> allPartitions = new HashSet<>();
         for (Map.Entry<String, List<PartitionInfo>> entry : consumer.listTopics().entrySet()) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
index dcc93ce..300adec 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicAssigner.java
@@ -17,10 +17,9 @@
 package org.apache.storm.kafka.spout.subscription;
 
 import java.io.Serializable;
-import java.util.HashSet;
 import java.util.Set;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 
 /**
@@ -38,7 +37,7 @@ public class TopicAssigner implements Serializable {
      * @param newAssignment The partitions to assign.
      * @param listener The rebalance listener to call back on when the assignment changes
      */
-    public <K, V> void assignPartitions(KafkaConsumer<K, V> consumer, Set<TopicPartition> newAssignment,
+    public <K, V> void assignPartitions(Consumer<K, V> consumer, Set<TopicPartition> newAssignment,
         ConsumerRebalanceListener listener) {
         Set<TopicPartition> currentAssignment = consumer.assignment();
         if (!newAssignment.equals(currentAssignment)) {

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
index ae2c254..6c59419 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/subscription/TopicFilter.java
@@ -18,7 +18,7 @@ package org.apache.storm.kafka.spout.subscription;
 
 import java.io.Serializable;
 import java.util.Set;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.Consumer;
 import org.apache.kafka.common.TopicPartition;
 
 public interface TopicFilter extends Serializable {
@@ -28,7 +28,7 @@ public interface TopicFilter extends Serializable {
      * @param consumer The Kafka consumer to use to read the list of existing partitions
      * @return The Kafka partitions this set of spouts should subscribe to
      */
-    Set<TopicPartition> getAllSubscribedPartitions(KafkaConsumer<?, ?> consumer);
+    Set<TopicPartition> getAllSubscribedPartitions(Consumer<?, ?> consumer);
     
     /**
      * Get the topics string.

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java
new file mode 100644
index 0000000..b2d79db
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentOpaqueSpoutEmitter.java
@@ -0,0 +1,68 @@
+/*
+ * Copyright 2018 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+public class KafkaTridentOpaqueSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
+        List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition,
+        Map<String, Object>>,
+        Serializable {
+    
+    private static final long serialVersionUID = 1;
+    private final KafkaTridentSpoutEmitter<K, V> emitter;
+
+    public KafkaTridentOpaqueSpoutEmitter(KafkaTridentSpoutEmitter<K, V> emitter) {
+        this.emitter = emitter;
+    }
+
+    @Override
+    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
+        KafkaTridentSpoutTopicPartition partition, Map<String, Object> lastPartitionMeta) {
+        return emitter.emitPartitionBatchNew(tx, collector, partition, lastPartitionMeta);
+    }
+
+    @Override
+    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
+        emitter.refreshPartitions(partitionResponsibilities);
+    }
+
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(List<Map<String, Object>> allPartitionInfo) {
+        return emitter.getOrderedPartitions(allPartitionInfo);
+    }
+
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
+        List<KafkaTridentSpoutTopicPartition> allPartitionInfoSorted) {
+        return emitter.getPartitionsForTask(taskId, numTasks, allPartitionInfoSorted);
+    }
+
+    @Override
+    public void close() {
+        emitter.close();
+    }
+    
+    
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
index 6e56bb5..f0d44b5 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -33,15 +33,17 @@ import org.slf4j.LoggerFactory;
 public class KafkaTridentSpoutBatchMetadata implements Serializable {
 
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
-    private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
 
     public static final String FIRST_OFFSET_KEY = "firstOffset";
     public static final String LAST_OFFSET_KEY = "lastOffset";
+    public static final String TOPOLOGY_ID_KEY = "topologyId";
 
     // first offset of this batch
     private final long firstOffset;
     // last offset of this batch
     private final long lastOffset;
+    //The unique topology id for the topology that created this metadata
+    private final String topologyId;
 
     /**
      * Builds a metadata object.
@@ -49,9 +51,10 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
      * @param firstOffset The first offset for the batch
      * @param lastOffset The last offset for the batch
      */
-    public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset) {
+    public KafkaTridentSpoutBatchMetadata(long firstOffset, long lastOffset, String topologyId) {
         this.firstOffset = firstOffset;
         this.lastOffset = lastOffset;
+        this.topologyId = topologyId;
     }
 
     /**
@@ -59,11 +62,12 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
      *
      * @param consumerRecords The non-empty set of records.
      */
-    public <K, V> KafkaTridentSpoutBatchMetadata(List<ConsumerRecord<K, V>> consumerRecords) {
+    public <K, V> KafkaTridentSpoutBatchMetadata(List<ConsumerRecord<K, V>> consumerRecords, String topologyId) {
         Validate.isTrue(!consumerRecords.isEmpty(), "There must be at least one record in order to build metadata");
 
         firstOffset = consumerRecords.get(0).offset();
         lastOffset = consumerRecords.get(consumerRecords.size() - 1).offset();
+        this.topologyId = topologyId;
         LOG.debug("Created {}", this.toString());
     }
 
@@ -75,6 +79,10 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
         return lastOffset;
     }
 
+    public String getTopologyId() {
+        return topologyId;
+    }
+
     /**
      * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
      *
@@ -82,8 +90,11 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
      * @return A new metadata object
      */
     public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
-        return new KafkaTridentSpoutBatchMetadata(((Number) map.get(FIRST_OFFSET_KEY)).longValue(),
-            ((Number) map.get(LAST_OFFSET_KEY)).longValue());
+        return new KafkaTridentSpoutBatchMetadata(
+            ((Number) map.get(FIRST_OFFSET_KEY)).longValue(),
+            ((Number) map.get(LAST_OFFSET_KEY)).longValue(),
+            (String) map.get(TOPOLOGY_ID_KEY)
+        );
     }
 
     /**
@@ -93,6 +104,7 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
         Map<String, Object> map = new HashMap<>();
         map.put(FIRST_OFFSET_KEY, firstOffset);
         map.put(LAST_OFFSET_KEY, lastOffset);
+        map.put(TOPOLOGY_ID_KEY, topologyId);
         return map;
     }
 
@@ -101,6 +113,7 @@ public class KafkaTridentSpoutBatchMetadata implements Serializable {
         return "KafkaTridentSpoutBatchMetadata{"
             + "firstOffset=" + firstOffset
             + ", lastOffset=" + lastOffset
+            + ", topologyId=" + topologyId
             + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/ba526077/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
new file mode 100644
index 0000000..4e46d4c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutCoordinator.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+import org.apache.kafka.clients.consumer.Consumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.internal.ConsumerFactory;
+import org.apache.storm.kafka.spout.internal.ConsumerFactoryDefault;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutCoordinator<K,V> implements 
+    IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
+    IPartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
+        Serializable {
+    //Initial delay for the assignment refresh timer
+    public static final long TIMER_DELAY_MS = 500;
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutCoordinator.class);
+
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private final Timer refreshAssignmentTimer;
+    private final Consumer<K, V> consumer;
+    
+    private Set<TopicPartition> partitionsForBatch;
+
+    /**
+     * Creates a new coordinator based on the given spout config.
+     * @param kafkaSpoutConfig The spout config to use
+     */
+    public KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this(kafkaSpoutConfig, new ConsumerFactoryDefault<>());
+    }
+    
+    KafkaTridentSpoutCoordinator(KafkaSpoutConfig<K, V> kafkaSpoutConfig, ConsumerFactory<K, V> consumerFactory) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
+        this.refreshAssignmentTimer = new Timer(TIMER_DELAY_MS, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
+        this.consumer = consumerFactory.createConsumer(kafkaSpoutConfig);
+        LOG.debug("Created {}", this.toString());
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        LOG.debug("isReady = true");
+        return true;    // the "old" trident kafka spout always returns true, like this
+    }
+
+    @Override
+    public List<Map<String, Object>> getPartitionsForBatch() {
+        if (refreshAssignmentTimer.isExpiredResetOnTrue() || partitionsForBatch == null) {
+            partitionsForBatch = kafkaSpoutConfig.getTopicFilter().getAllSubscribedPartitions(consumer);
+        }
+        LOG.debug("TopicPartitions for batch {}", partitionsForBatch);
+        return partitionsForBatch.stream()
+            .map(tp -> tpSerializer.toMap(tp))
+            .collect(Collectors.toList());
+    }
+
+    @Override
+    public void close() {
+        this.consumer.close();
+        LOG.debug("Closed");
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString()
+                + "{kafkaSpoutConfig=" + kafkaSpoutConfig
+                + '}';
+    }
+}