You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:09 UTC

[08/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch

STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch

Manually copied from 1.x-branch at commit SHA 74ca795.

This doesn't compile, intentionally, it's meant to be an exact copy of what was in 1.x-branch, with no manual changes.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/e16fa19f
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/e16fa19f
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/e16fa19f

Branch: refs/heads/1.0.x-branch
Commit: e16fa19f21ec0288f915fbd7f0730821cdb318a4
Parents: 9c89300
Author: Erik Weathers <er...@gmail.com>
Authored: Tue Feb 6 16:10:36 2018 -0800
Committer: Erik Weathers <er...@gmail.com>
Committed: Wed Feb 7 18:39:23 2018 -0800

----------------------------------------------------------------------
 external/storm-kafka-client/README.md           | 162 +---
 external/storm-kafka-client/pom.xml             |  82 +-
 .../org/apache/storm/kafka/bolt/KafkaBolt.java  | 220 +++++
 .../FieldNameBasedTupleToKafkaMapper.java       |  48 +
 .../kafka/bolt/mapper/TupleToKafkaMapper.java   |  32 +
 .../bolt/selector/DefaultTopicSelector.java     |  34 +
 .../bolt/selector/FieldIndexTopicSelector.java  |  52 ++
 .../bolt/selector/FieldNameTopicSelector.java   |  49 +
 .../kafka/bolt/selector/KafkaTopicSelector.java |  26 +
 .../kafka/spout/ByTopicRecordTranslator.java    | 149 +++
 .../kafka/spout/DefaultRecordTranslator.java    |  47 +
 .../kafka/spout/EmptyKafkaTupleListener.java    |  53 ++
 .../java/org/apache/storm/kafka/spout/Func.java |  28 +
 .../apache/storm/kafka/spout/KafkaSpout.java    | 887 ++++++++++--------
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 928 +++++++++++++++----
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  57 +-
 .../KafkaSpoutRetryExponentialBackoff.java      | 150 ++-
 .../kafka/spout/KafkaSpoutRetryService.java     |  38 +-
 .../storm/kafka/spout/KafkaSpoutStream.java     | 121 ---
 .../storm/kafka/spout/KafkaSpoutStreams.java    |  35 -
 .../spout/KafkaSpoutStreamsNamedTopics.java     | 154 ---
 .../spout/KafkaSpoutStreamsWildcardTopics.java  |  61 --
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  58 --
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  32 -
 .../KafkaSpoutTuplesBuilderNamedTopics.java     |  78 --
 .../KafkaSpoutTuplesBuilderWildcardTopics.java  |  36 -
 .../apache/storm/kafka/spout/KafkaTuple.java    |  47 +
 .../storm/kafka/spout/KafkaTupleListener.java   |  83 ++
 .../spout/ManualPartitionSubscription.java      |  69 ++
 .../storm/kafka/spout/ManualPartitioner.java    |  41 +
 .../storm/kafka/spout/NamedSubscription.java    |  64 ++
 .../storm/kafka/spout/NamedTopicFilter.java     |  68 ++
 .../storm/kafka/spout/PatternSubscription.java  |  56 ++
 .../storm/kafka/spout/PatternTopicFilter.java   |  70 ++
 .../storm/kafka/spout/RecordTranslator.java     |  56 ++
 .../spout/RoundRobinManualPartitioner.java      |  50 +
 .../kafka/spout/SerializableDeserializer.java   |  29 +
 .../kafka/spout/SimpleRecordTranslator.java     |  58 ++
 .../apache/storm/kafka/spout/Subscription.java  |  56 ++
 .../apache/storm/kafka/spout/TopicFilter.java   |  38 +
 .../kafka/spout/TopicPartitionComparator.java   |  49 +
 .../kafka/spout/internal/CommitMetadata.java    |  63 ++
 .../spout/internal/CommitMetadataManager.java   |  91 ++
 .../internal/KafkaConsumerFactoryDefault.java   |   3 +-
 .../kafka/spout/internal/OffsetManager.java     | 246 +++++
 .../storm/kafka/spout/internal/Timer.java       |  75 ++
 .../kafka/spout/metrics/KafkaOffsetMetric.java  | 141 +++
 .../trident/KafkaTridentSpoutBatchMetadata.java | 124 +++
 .../spout/trident/KafkaTridentSpoutEmitter.java | 279 ++++++
 .../spout/trident/KafkaTridentSpoutManager.java | 117 +++
 .../spout/trident/KafkaTridentSpoutOpaque.java  |  75 ++
 .../KafkaTridentSpoutOpaqueCoordinator.java     |  70 ++
 .../KafkaTridentSpoutTopicPartition.java        |  68 ++
 ...KafkaTridentSpoutTopicPartitionRegistry.java |  48 +
 .../trident/KafkaTridentSpoutTransactional.java |  48 +
 .../spout/trident/TopicPartitionSerializer.java |  47 +
 .../storm/kafka/trident/TridentKafkaState.java  | 138 +++
 .../kafka/trident/TridentKafkaStateFactory.java |  64 ++
 .../kafka/trident/TridentKafkaStateUpdater.java |  34 +
 .../FieldNameBasedTupleToKafkaMapper.java       |  41 +
 .../mapper/TridentTupleToKafkaMapper.java       |  28 +
 .../trident/selector/DefaultTopicSelector.java  |  34 +
 .../trident/selector/KafkaTopicSelector.java    |  26 +
 .../java/org/apache/storm/kafka/KafkaUnit.java  | 114 +++
 .../org/apache/storm/kafka/KafkaUnitRule.java   |  46 +
 .../apache/storm/kafka/bolt/KafkaBoltTest.java  |  91 ++
 .../spout/ByTopicRecordTranslatorTest.java      |  93 ++
 .../spout/DefaultRecordTranslatorTest.java      |  37 +
 .../kafka/spout/KafkaSpoutAbstractTest.java     | 179 ++++
 .../storm/kafka/spout/KafkaSpoutConfigTest.java | 241 +++++
 .../storm/kafka/spout/KafkaSpoutEmitTest.java   | 214 +++++
 .../KafkaSpoutLogCompactionSupportTest.java     | 223 +++++
 .../spout/KafkaSpoutMessagingGuaranteeTest.java | 259 ++++++
 .../kafka/spout/KafkaSpoutReactivationTest.java | 145 +++
 .../kafka/spout/KafkaSpoutRebalanceTest.java    | 204 ++--
 .../KafkaSpoutRetryExponentialBackoffTest.java  | 292 ++++++
 .../kafka/spout/KafkaSpoutRetryLimitTest.java   | 114 +++
 .../kafka/spout/KafkaSpoutSingleTopicTest.java  | 379 ++++++++
 ...outTopologyDeployActivateDeactivateTest.java | 116 +++
 .../kafka/spout/MaxUncommittedOffsetTest.java   | 293 ++++++
 .../kafka/spout/NamedSubscriptionTest.java      |  54 ++
 .../storm/kafka/spout/NamedTopicFilterTest.java |  70 ++
 .../kafka/spout/PatternTopicFilterTest.java     |  75 ++
 .../spout/SingleTopicKafkaUnitSetupHelper.java  |  89 ++
 .../SpoutWithMockedConsumerSetupHelper.java     | 171 ++++
 .../SingleTopicKafkaSpoutConfiguration.java     |  82 +-
 .../builders/TopicKeyValueTupleBuilder.java     |  40 -
 .../kafka/spout/internal/OffsetManagerTest.java | 196 ++++
 .../ManualPartitionSubscriptionTest.java        |  81 ++
 .../test/KafkaSpoutTopologyMainNamedTopics.java |  88 +-
 .../KafkaSpoutTopologyMainWildcardTopics.java   |  52 +-
 .../spout/test/TopicTest2TupleBuilder.java      |  40 -
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 -
 .../KafkaTridentSpoutBatchMetadataTest.java     |  66 ++
 .../src/test/resources/log4j2.xml               |  32 +
 95 files changed, 8844 insertions(+), 1685 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
index fba5766..8d27aa4 100644
--- a/external/storm-kafka-client/README.md
+++ b/external/storm-kafka-client/README.md
@@ -1,160 +1,6 @@
-#Storm Kafka Spout with New Kafka Consumer API
+# Storm Apache Kafka integration using the kafka-client jar
+This includes the new Apache Kafka consumer API.
 
-Apache Storm Spout implementation to consume data from Apache Kafka 0.9.x. It allows 
-clients to consume data from Kafka starting at offsets as defined by the offset strategy specified in `FirstPollOffsetStrategy`. 
-In case of failure, the Kafka Spout will re-start consuming messages from the offset that matches the chosen `FirstPollOffsetStrategy`.
+Spouts and Bolts that write to and read from Kafka through the kafka-client library.
 
-The Kafka Spout implementation allows you to specify the stream (`KafkaSpoutStream`) associated with each topic. `KafkaSpoutStream` represents the stream and output fields used by a topic.
-
-The `KafkaSpoutTuplesBuilder` wraps all the logic that builds `Tuple`s from `ConsumerRecord`s. The logic is provided by the user by implementing the appropriate number of `KafkaSpoutTupleBuilder` instances.
-
-Multiple topics can use the same `KafkaSpoutTupleBuilder` implementation, as long as the logic to build `Tuple`s from `ConsumerRecord`s is identical.
-
-# Usage Examples
-
-### Create a Kafka Spout:
-
-The code snippet bellow is extracted from the example in the module [test] (https://github.com/apache/storm/tree/master/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test). Please refer to this module for more detail
-
-```java
- KafkaSpoutStreams kafkaSpoutStreams = new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
-                .build();
-
-        Map<String, Object> kafkaConsumerProps = new HashMap<>();
-        kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
-        kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-        kafkaConsumerProps.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-
-        KafkaSpoutTuplesBuilder<String, String> tuplesBuilder = new KafkaSpoutTuplesBuilder.Builder<>(
-                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
-                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
-                .build();
-
-        KafkaSpoutRetryService retryService = new KafkaSpoutRetryExponentialBackoff(new KafkaSpoutRetryExponentialBackoff.TimeInterval(500, TimeUnit.MICROSECONDS),
-                KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(2), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.seconds(10));
-
-        KafkaSpoutConfig kafkaSpoutConfig = new KafkaSpoutConfig.Builder<String, String>(kafkaConsumerProps, kafkaSpoutStreams, tuplesBuilder, retryService)
-                .setOffsetCommitPeriodMs(10_000)
-                .setFirstPollOffsetStrategy(KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST)
-                .setMaxUncommittedOffsets(250)
-                .build();
-        KafkaSpout<String,String> kafkaSpout = new KafkaSpout<>(kafkaSpoutConfig);
-```
- 
-### Create a simple Toplogy using the Kafka Spout:
-
-```java
-TopologyBuilder tp = new TopologyBuilder();
-tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
-tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
-tp.createTopology();
-```
-
-# Build And Run Bundled Examples  
-To be able to run the examples you must first build the java code in the package `storm-kafka-client`, 
-and then generate an uber jar with all the dependencies.
-
-## Use the Maven Shade Plugin to Build the Uber Jar
-
-Add the following to `REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-```xml
-<plugin>
-    <groupId>org.apache.maven.plugins</groupId>
-    <artifactId>maven-shade-plugin</artifactId>
-    <version>2.4.1</version>
-    <executions>
-        <execution>
-            <phase>package</phase>
-            <goals>
-                <goal>shade</goal>
-            </goals>
-            <configuration>
-                <transformers>
-                    <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
-                        <mainClass>org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain</mainClass>
-                    </transformer>
-                </transformers>
-            </configuration>
-        </execution>
-    </executions>
-</plugin>
-```
-
-create the uber jar by running the commmand:
-
-`mvn package -f REPO_HOME/storm/external/storm-kafka-client/pom.xml`
-
-This will create the uber jar file with the name and location matching the following pattern:
- 
-`REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar`
-
-### Run Storm Topology
-
-Copy the file `REPO_HOME/storm/external/storm-kafka-client/target/storm-kafka-client-1.0.x.jar` to `STORM_HOME/extlib`
-
-Using the Kafka command line tools create three topics [test, test1, test2] and use the Kafka console producer to populate the topics with some data 
-
-Execute the command `STORM_HOME/bin/storm jar REPO_HOME/storm/external/storm/target/storm-kafka-client-1.0.x.jar org.apache.storm.kafka.spout.test.KafkaSpoutTopologyMain`
-
-With the debug level logs enabled it is possible to see the messages of each topic being redirected to the appropriate Bolt as defined 
-by the streams defined and choice of shuffle grouping.
-
-## Using storm-kafka-client with different versions of kafka
-
-Storm-kafka-client's Kafka dependency is defined as `provided` scope in maven, meaning it will not be pulled in
-as a transitive dependency. This allows you to use a version of Kafka dependency compatible with your kafka cluster.
-
-When building a project with storm-kafka-client, you must explicitly add the Kafka clients dependency. For example, to
-use Kafka-clients 0.9.0.1, you would use the following dependency in your `pom.xml`:
-
-```xml
-        <dependency>
-            <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka-clients</artifactId>
-            <version>0.9.0.1</version>
-        </dependency>
-```
-
-You can also override the kafka clients version while building from maven, with parameter `storm.kafka.client.version`
-e.g. `mvn clean install -Dstorm.kafka.client.version=0.9.0.1`
-
-When selecting a kafka client version, you should ensure -
- 1. kafka api is compatible. storm-kafka-client module only supports **0.9 or newer** kafka client API. For older versions,
- you can use storm-kafka module (https://github.com/apache/storm/tree/master/external/storm-kafka).
- 2. The kafka client selected by you should be wire compatible with the broker. e.g. 0.9.x client will not work with
- 0.8.x broker.
-
-#Kafka Spout Performance Tuning
-
-The Kafka spout provides two internal parameters to control its performance. The parameters can be set using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) methods [setOffsetCommitPeriodMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L189-L193) and [setMaxUncommittedOffsets] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L211-L217). 
-
-* "offset.commit.period.ms" controls how often the spout commits to Kafka
-* "max.uncommitted.offsets" controls how many offsets can be pending commit before another poll can take place
-<br/>
-
-The [Kafka consumer config] (http://kafka.apache.org/documentation.html#consumerconfigs) parameters may also have an impact on the performance of the spout. The following Kafka parameters are likely the most influential in the spout performance: 
-
-* “fetch.min.bytes”
-* “fetch.max.wait.ms”
-* [Kafka Consumer] (http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html) instance poll timeout, which is specified for each Kafka spout using the [KafkaSpoutConfig] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java) method [setPollTimeoutMs] (https://github.com/apache/storm/blob/1.0.x-branch/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java#L180-L184)
-<br/>
-
-Depending on the structure of your Kafka cluster, distribution of the data, and availability of data to poll, these parameters will have to be configured appropriately. Please refer to the Kafka documentation on Kafka parameter tuning.
-
-###Default values
-
-Currently the Kafka spout has has the following default values, which have shown to give good performance in the test environment as described in this [blog post] (https://hortonworks.com/blog/microbenchmarking-storm-1-0-performance/)
-
-* poll.timeout.ms = 200
-* offset.commit.period.ms = 30000   (30s)
-* max.uncommitted.offsets = 10000000
-<br/>
-
-There will be a blog post coming soon analyzing the trade-offs of this tuning parameters, and comparing the performance of the Kafka Spouts using the Kafka client API introduced in 0.9 (new implementation) and in prior versions (prior implementation)
-
-#Future Work
-Trident spout implementation, support for topic patterns, and comprehensive metrics
+Please see [here](../../docs/storm-kafka-client.md) for details on how to use it.

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
index 14f6c5e..51bb797 100644
--- a/external/storm-kafka-client/pom.xml
+++ b/external/storm-kafka-client/pom.xml
@@ -22,7 +22,7 @@
     <parent>
         <artifactId>storm</artifactId>
         <groupId>org.apache.storm</groupId>
-        <version>1.0.6-SNAPSHOT</version>
+        <version>1.2.0-SNAPSHOT</version>
         <relativePath>../../pom.xml</relativePath>
     </parent>
 
@@ -40,18 +40,44 @@
     </developers>
 
     <dependencies>
+        <dependency>
+            <groupId>commons-lang</groupId>
+            <artifactId>commons-lang</artifactId>
+        </dependency>
         <!--parent module dependency-->
         <dependency>
             <groupId>org.apache.storm</groupId>
             <artifactId>storm-core</artifactId>
             <version>${project.version}</version>
-            <scope>provided</scope>
+            <scope>${provided.scope}</scope>
         </dependency>
         <!--kafka libraries-->
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
             <version>${storm.kafka.client.version}</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.apache.zookeeper</groupId>
+                    <artifactId>zookeeper</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>log4j</groupId>
+                    <artifactId>log4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>com.fasterxml.jackson.core</groupId>
+            <artifactId>jackson-databind</artifactId>
+        </dependency>
+        <dependency>
+            <groupId>com.google.guava</groupId>
+            <artifactId>guava</artifactId>
         </dependency>
         <!--test dependencies -->
         <dependency>
@@ -62,21 +88,67 @@
         </dependency>
         <dependency>
             <groupId>org.hamcrest</groupId>
-            <artifactId>hamcrest-all</artifactId>
+            <artifactId>hamcrest-core</artifactId>
             <version>1.3</version>
             <scope>test</scope>
         </dependency>
         <dependency>
-            <groupId>junit</groupId>
-            <artifactId>junit</artifactId>
+            <groupId>org.hamcrest</groupId>
+            <artifactId>hamcrest-library</artifactId>
+            <version>1.3</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>log4j-over-slf4j</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <classifier>test</classifier>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <classifier>test</classifier>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka_2.11</artifactId>
+            <version>${storm.kafka.client.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-log4j12</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
     <build>
         <plugins>
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-surefire-plugin</artifactId>
+                <configuration>
+                    <reuseForks>true</reuseForks>
+                    <forkCount>1</forkCount>
+                </configuration>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-jar-plugin</artifactId>
                 <version>2.5</version>
                 <executions>

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
new file mode 100644
index 0000000..30f97a0
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/KafkaBolt.java
@@ -0,0 +1,220 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.utils.TupleUtils;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.clients.producer.Callback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
+import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
+import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
+import java.util.concurrent.Future;
+import java.util.concurrent.ExecutionException;
+import java.util.Map;
+import java.util.Properties;
+
+
+/**
+ * Bolt implementation that can send Tuple data to Kafka.
+ * <p/>
+ * Most configuration for this bolt should be through the various 
+ * setter methods in the bolt.
+ * For backwards compatibility it supports the producer
+ * configuration and topic to be placed in the storm config under
+ * <p/>
+ * 'kafka.broker.properties' and 'topic'
+ * <p/>
+ * respectively.
+ */
+public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
+    private static final long serialVersionUID = -5205886631877033478L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
+
+    public static final String TOPIC = "topic";
+
+    private KafkaProducer<K, V> producer;
+    private OutputCollector collector;
+    private TupleToKafkaMapper<K,V> mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties boltSpecifiedProperties = new Properties();
+    /**
+     * {@see KafkaBolt#setFireAndForget(boolean)} for more details on this. 
+     */
+    private boolean fireAndForget = false;
+    /**
+     * {@see KafkaBolt#setAsync(boolean)} for more details on this. 
+     */
+    private boolean async = true;
+
+    public KafkaBolt() {}
+
+    public KafkaBolt<K,V> withTupleToKafkaMapper(TupleToKafkaMapper<K,V> mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    /**
+     * Set the messages to be published to a single topic
+     * @param topic the topic to publish to
+     * @return this
+     */
+    public KafkaBolt<K, V> withTopicSelector(String topic) {
+        return withTopicSelector(new DefaultTopicSelector(topic));
+    }
+    
+    public KafkaBolt<K,V> withTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public KafkaBolt<K,V> withProducerProperties(Properties producerProperties) {
+        this.boltSpecifiedProperties = producerProperties;
+        return this;
+    }
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        LOG.info("Preparing bolt with configuration {}", this);
+        //for backward compatibility.
+        if (mapper == null) {
+            LOG.info("Mapper not specified. Setting default mapper to {}", FieldNameBasedTupleToKafkaMapper.class.getSimpleName());
+            this.mapper = new FieldNameBasedTupleToKafkaMapper<K,V>();
+        }
+
+        //for backward compatibility.
+        if (topicSelector == null) {
+            if (stormConf.containsKey(TOPIC)) {
+                LOG.info("TopicSelector not specified. Using [{}] for topic [{}] specified in bolt configuration,",
+                        DefaultTopicSelector.class.getSimpleName(), stormConf.get(TOPIC));
+                this.topicSelector = new DefaultTopicSelector((String) stormConf.get(TOPIC));
+            } else {
+                throw new IllegalStateException("topic should be specified in bolt's configuration");
+            }
+        }
+
+        producer = mkProducer(boltSpecifiedProperties);
+        this.collector = collector;
+    }
+    
+    /**
+     * Intended to be overridden for tests.  Make the producer with the given props
+     */
+    protected KafkaProducer<K, V> mkProducer(Properties props) {
+        return new KafkaProducer<>(props);
+    }
+
+    @Override
+    protected void process(final Tuple input) {
+        K key = null;
+        V message = null;
+        String topic = null;
+        try {
+            key = mapper.getKeyFromTuple(input);
+            message = mapper.getMessageFromTuple(input);
+            topic = topicSelector.getTopic(input);
+            if (topic != null ) {
+                Callback callback = null;
+
+                if (!fireAndForget && async) {
+                    callback = new Callback() {
+                        @Override
+                        public void onCompletion(RecordMetadata ignored, Exception e) {
+                            synchronized (collector) {
+                                if (e != null) {
+                                    collector.reportError(e);
+                                    collector.fail(input);
+                                } else {
+                                    collector.ack(input);
+                                }
+                            }
+                        }
+                    };
+                }
+                Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
+                if (!async) {
+                    try {
+                        result.get();
+                        collector.ack(input);
+                    } catch (ExecutionException err) {
+                        collector.reportError(err);
+                        collector.fail(input);
+                    }
+                } else if (fireAndForget) {
+                    collector.ack(input);
+                }
+            } else {
+                LOG.warn("skipping key = " + key + ", topic selector returned null.");
+                collector.ack(input);
+            }
+        } catch (Exception ex) {
+            collector.reportError(ex);
+            collector.fail(input);
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+
+    @Override
+    public void cleanup() {
+        producer.close();
+    }
+
+    /**
+     * If set to true the bolt will assume that sending a message to kafka will succeed and will ack
+     * the tuple as soon as it has handed the message off to the producer API
+     * if false (the default) the message will be acked after it was successfully sent to kafka or
+     * failed if it was not successfully sent.
+     * @param fireAndForget
+     */
+    public void setFireAndForget(boolean fireAndForget) {
+        this.fireAndForget = fireAndForget;
+    }
+
+    /**
+     * If set to true(the default) the bolt will not wait for the message
+     * to be fully sent to Kafka before getting another tuple to send.
+     * @param async true to have multiple tuples in flight to kafka, else false.
+     */
+    public void setAsync(boolean async) {
+        this.async = async;
+    }
+    
+    @Override
+    public String toString() {
+        return "KafkaBolt: {mapper: " + mapper +
+                " topicSelector: " + topicSelector +
+                " fireAndForget: " + fireAndForget +
+                " async: " + async +
+                " proerties: " + boltSpecifiedProperties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..f7638aa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K,V> implements TupleToKafkaMapper<K, V> {
+    private static final long serialVersionUID = -8794262989021702349L;
+    public static final String BOLT_KEY = "key";
+    public static final String BOLT_MESSAGE = "message";
+    public String boltKeyField;
+    public String boltMessageField;
+
+    public FieldNameBasedTupleToKafkaMapper() {
+        this(BOLT_KEY, BOLT_MESSAGE);
+    }
+
+    public FieldNameBasedTupleToKafkaMapper(String boltKeyField, String boltMessageField) {
+        this.boltKeyField = boltKeyField;
+        this.boltMessageField = boltMessageField;
+    }
+
+    @Override
+    public K getKeyFromTuple(Tuple tuple) {
+        //for backward compatibility, we return null when key is not present.
+        return tuple.contains(boltKeyField) ? (K) tuple.getValueByField(boltKeyField) : null;
+    }
+
+    @Override
+    public V getMessageFromTuple(Tuple tuple) {
+        return (V) tuple.getValueByField(boltMessageField);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
new file mode 100644
index 0000000..9f11fc9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/mapper/TupleToKafkaMapper.java
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.mapper;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+/**
+ * Interface defining a mapping from storm tuple to kafka key and message.
+ * @param <K> type of key.
+ * @param <V> type of value.
+ */
+public interface TupleToKafkaMapper<K,V> extends Serializable {
+    K getKeyFromTuple(Tuple tuple);
+    V getMessageFromTuple(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..3d00fc1
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = 4601118062437851265L;
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
new file mode 100644
index 0000000..ffe0b35a
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldIndexTopicSelector.java
@@ -0,0 +1,52 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field with a given index to select the topic name from a tuple .
+ */
+public class FieldIndexTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -3830575380208166367L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(FieldIndexTopicSelector.class);
+
+    private final int fieldIndex;
+    private final String defaultTopicName;
+
+    public FieldIndexTopicSelector(int fieldIndex, String defaultTopicName) {
+        this.fieldIndex = fieldIndex;
+        if (fieldIndex < 0) {
+            throw new IllegalArgumentException("fieldIndex cannot be negative");
+        }
+        this.defaultTopicName = defaultTopicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        if (fieldIndex < tuple.size()) {
+            return tuple.getString(fieldIndex);
+        } else {
+            LOG.warn("Field index {} is out of bounds. Using default topic {}", fieldIndex, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
new file mode 100644
index 0000000..e90b26f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/FieldNameTopicSelector.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Uses field name to select topic name from tuple .
+ */
+public class FieldNameTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -3903708904533396833L;
+    private static final Logger LOG = LoggerFactory.getLogger(FieldNameTopicSelector.class);
+
+    private final String fieldName;
+    private final String defaultTopicName;
+
+
+    public FieldNameTopicSelector(String fieldName, String defaultTopicName) {
+        this.fieldName = fieldName;
+        this.defaultTopicName = defaultTopicName;
+    }
+
+    @Override
+    public String getTopic(Tuple tuple) {
+        if (tuple.contains(fieldName)) {
+            return tuple.getStringByField(fieldName);
+        } else {
+            LOG.warn("Field {} Not Found. Returning default topic {}", fieldName, defaultTopicName);
+            return defaultTopicName;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..cb7fb44
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/bolt/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt.selector;
+
+import org.apache.storm.tuple.Tuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(Tuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
new file mode 100644
index 0000000..8ad527d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ByTopicRecordTranslator.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Based off of a given Kafka topic a ConsumerRecord came from it will be translated to a Storm tuple
+ * and emitted to a given stream
+ * @param <K> the key of the incoming Records
+ * @param <V> the value of the incoming Records
+ */
+public class ByTopicRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = -121699733778988688L;
+    private final RecordTranslator<K,V> defaultTranslator;
+    private final Map<String, RecordTranslator<K,V>> topicToTranslator = new HashMap<>();
+    private final Map<String, Fields> streamToFields = new HashMap<>();
+    
+    /**
+     * Create a simple record translator that will use func to extract the fields of the tuple,
+     * named by fields, and emit them to stream. This will handle all topics not explicitly set
+     * elsewhere.
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @param stream the stream to emit these fields on.
+     */
+    public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+        this(new SimpleRecordTranslator<>(func, fields, stream));
+    }
+    
+    /**
+     * Create a simple record translator that will use func to extract the fields of the tuple,
+     * named by fields, and emit them to the default stream. This will handle all topics not explicitly set
+     * elsewhere.
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     */
+    public ByTopicRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+        this(new SimpleRecordTranslator<>(func, fields));
+    }
+    
+    /**
+     * @param defaultTranslator a translator that will be used for all topics not explicitly set
+     * elsewhere.
+     */
+    public ByTopicRecordTranslator(RecordTranslator<K,V> defaultTranslator) {
+        this.defaultTranslator = defaultTranslator;
+        //This shouldn't throw on a Check, because nothing is configured yet
+        cacheNCheckFields(defaultTranslator);
+    }
+    
+    /**
+     * Configure a translator for a given topic with tuples to be emitted to the default stream.
+     * @param topic the topic this should be used for
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to another translator
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+        return forTopic(topic, new SimpleRecordTranslator<>(func, fields));
+    }
+    
+    /**
+     * Configure a translator for a given topic.
+     * @param topic the topic this should be used for
+     * @param func extracts and turns them into a list of objects to be emitted
+     * @param fields the names of the fields extracted
+     * @param stream the stream to emit the tuples to.
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to another translator
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+        return forTopic(topic, new SimpleRecordTranslator<>(func, fields, stream));
+    }
+    
+    /**
+     * Configure a translator for a given kafka topic
+     * @param topic the topic this translator should handle
+     * @param translator the translator itself
+     * @return this to be able to chain configuration
+     * @throws IllegalStateException if the topic is already registered to another translator
+     * @throws IllegalArgumentException if the Fields for the stream this emits to do not match any already configured Fields for the same stream
+     */
+    public ByTopicRecordTranslator<K, V> forTopic(String topic, RecordTranslator<K,V> translator) {
+        if (topicToTranslator.containsKey(topic)) {
+            throw new IllegalStateException("Topic " + topic + " is already registered");
+        }
+        cacheNCheckFields(translator);
+        topicToTranslator.put(topic, translator);
+        return this;
+    }
+    
+    private void cacheNCheckFields(RecordTranslator<K, V> translator) {
+        for (String stream : translator.streams()) {
+            Fields fromTrans = translator.getFieldsFor(stream);
+            Fields cached = streamToFields.get(stream);
+            if (cached != null && !fromTrans.equals(cached)) {
+                throw new IllegalArgumentException("Stream " + stream + " currently has Fields of " + cached + " which is not the same as those being added in " + fromTrans);
+            }
+            
+            if (cached == null) {
+                streamToFields.put(stream, fromTrans);
+            }
+        }
+    }
+
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        RecordTranslator<K, V> trans = topicToTranslator.get(record.topic());
+        if (trans == null) {
+            trans = defaultTranslator;
+        }
+        return trans.apply(record);
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return streamToFields.get(stream);
+    }
+    
+    @Override
+    public List<String> streams() {
+        return new ArrayList<>(streamToFields.keySet());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
new file mode 100644
index 0000000..4b0262b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/DefaultRecordTranslator.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+public class DefaultRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = -5782462870112305750L;
+    public static final Fields FIELDS = new Fields("topic", "partition", "offset", "key", "value");
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        return new Values(record.topic(),
+                record.partition(),
+                record.offset(),
+                record.key(),
+                record.value());
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return FIELDS;
+    }
+
+    @Override
+    public List<String> streams() {
+        return DEFAULT_STREAM;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
new file mode 100644
index 0000000..621fecd
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/EmptyKafkaTupleListener.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+
+package org.apache.storm.kafka.spout;
+
+import java.util.Collection;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public final class EmptyKafkaTupleListener implements KafkaTupleListener {
+
+    @Override
+    public void open(Map<String, Object> conf, TopologyContext context) { }
+
+    @Override
+    public void onEmit(List<Object> tuple, KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onAck(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onPartitionsReassigned(Collection<TopicPartition> topicPartitions) { }
+
+    @Override
+    public void onRetry(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public void onMaxRetryReached(KafkaSpoutMessageId msgId) { }
+
+    @Override
+    public String toString() {
+        return "EmptyKafkaTupleListener";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
new file mode 100644
index 0000000..0414533
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Func.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+/**
+ * A simple interface to allow compatibility with non java 8
+ * code bases 
+ */
+public interface Func<V, R> extends Serializable {
+    R apply(V record);
+}