You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:00:08 UTC

[1/3] storm git commit: Added STORM-822 to Changelog

Repository: storm
Updated Branches:
  refs/heads/master 955b44552 -> cd0c0f733


Added STORM-822 to Changelog


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/cd0c0f73
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/cd0c0f73
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/cd0c0f73

Branch: refs/heads/master
Commit: cd0c0f73322fbaf81405be6cdaddeab080bd59b1
Parents: 332afc4
Author: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Authored: Thu Mar 31 13:25:15 2016 -0500
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500

----------------------------------------------------------------------
 CHANGELOG.md | 1 +
 1 file changed, 1 insertion(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/cd0c0f73/CHANGELOG.md
----------------------------------------------------------------------
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 3462d5d..15ff766 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,4 +1,5 @@
 ## 2.0.0
+ * STORM-822: Kafka Spout New Consumer API
  * STORM-1663: Stats couldn't handle null worker HB.
  * STORM-1665: Worker cannot instantiate kryo
  * STORM-1666: Kill from the UI fails silently


[3/3] storm git commit: STORM-822: Kafka Spout New Consumer API

Posted by bo...@apache.org.
STORM-822: Kafka Spout New Consumer API


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d26b984d
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d26b984d
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d26b984d

Branch: refs/heads/master
Commit: d26b984d74987c95badbdf8a73c74c4304bd4ec9
Parents: 955b445
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Dec 14 10:16:42 2015 -0800
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500

----------------------------------------------------------------------
 examples/storm-starter/pom.xml                  |  16 +-
 external/flux/flux-examples/pom.xml             |  13 +-
 external/flux/pom.xml                           |  13 +-
 external/sql/storm-sql-kafka/pom.xml            |  16 +-
 external/storm-kafka-client/README.md           |   9 +
 external/storm-kafka-client/pom.xml             |  86 ++++
 .../kafka/spout/KafkaRecordTupleBuilder.java    |  44 ++
 .../apache/storm/kafka/spout/KafkaSpout.java    | 503 +++++++++++++++++++
 .../storm/kafka/spout/KafkaSpoutConfig.java     | 298 +++++++++++
 .../storm/kafka/spout/KafkaSpoutMessageId.java  | 111 ++++
 .../storm/kafka/spout/KafkaSpoutStream.java     |  66 +++
 .../storm/kafka/spout/KafkaSpoutStreams.java    | 162 ++++++
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  28 ++
 .../spout/test/KafkaSpoutTopologyMain.java      | 120 +++++
 .../storm/kafka/spout/test/KafkaTestBolt.java   |  52 ++
 external/storm-kafka/pom.xml                    |  16 +-
 external/storm-solr/pom.xml                     |   8 +-
 pom.xml                                         |  57 ++-
 storm-dist/binary/src/main/assembly/binary.xml  |  14 +
 19 files changed, 1544 insertions(+), 88 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/examples/storm-starter/pom.xml
----------------------------------------------------------------------
diff --git a/examples/storm-starter/pom.xml b/examples/storm-starter/pom.xml
index 0b5fd97..112f1e8 100644
--- a/examples/storm-starter/pom.xml
+++ b/examples/storm-starter/pom.xml
@@ -158,26 +158,12 @@
       </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
-      <artifactId>kafka_2.10</artifactId>
-      <version>0.8.2.1</version>
-      <!-- use provided scope, so users can pull in whichever scala version they choose -->
+      <artifactId>${kafka.artifact.id}</artifactId>
       <scope>provided</scope>
-      <exclusions>
-        <exclusion>
-          <groupId>org.apache.zookeeper</groupId>
-          <artifactId>zookeeper</artifactId>
-        </exclusion>
-        <exclusion>
-          <groupId>log4j</groupId>
-          <artifactId>log4j</artifactId>
-        </exclusion>
-      </exclusions>
     </dependency>
     <dependency>
       <groupId>org.apache.kafka</groupId>
       <artifactId>kafka-clients</artifactId>
-      <version>0.8.2.1</version>
-      <scope>provided</scope>
     </dependency>
     <dependency>
       <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/flux-examples/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/flux-examples/pom.xml b/external/flux/flux-examples/pom.xml
index 28d7239..48cc151 100644
--- a/external/flux/flux-examples/pom.xml
+++ b/external/flux/flux-examples/pom.xml
@@ -95,18 +95,7 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.1.1</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
+            <artifactId>${kafka.artifact.id}</artifactId>
         </dependency>
     </dependencies>
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/flux/pom.xml
----------------------------------------------------------------------
diff --git a/external/flux/pom.xml b/external/flux/pom.xml
index 1fd1683..56d9bab 100644
--- a/external/flux/pom.xml
+++ b/external/flux/pom.xml
@@ -78,19 +78,8 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.1.1</version>
+            <artifactId>${kafka.artifact.id}</artifactId>
             <scope>test</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>junit</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/sql/storm-sql-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/sql/storm-sql-kafka/pom.xml b/external/sql/storm-sql-kafka/pom.xml
index 450611e..0642d17 100644
--- a/external/sql/storm-sql-kafka/pom.xml
+++ b/external/sql/storm-sql-kafka/pom.xml
@@ -63,26 +63,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.2.1</version>
-            <!-- use provided scope, so users can pull in whichever scala version they choose -->
+            <artifactId>${kafka.artifact.id}</artifactId>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>0.8.2.1</version>
-            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>com.fasterxml.jackson.core</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/README.md b/external/storm-kafka-client/README.md
new file mode 100644
index 0000000..8ac15f5
--- /dev/null
+++ b/external/storm-kafka-client/README.md
@@ -0,0 +1,9 @@
+#Storm Kafka Spout New Consumer API
+
+This patch is still under development and it comes with no warranties at this moment. 
+
+It has not been thoroughly tested, and therefore there may be some bugs and it is not ready for production. 
+
+The documentation will be uploaded soon.
+
+To see how to use the new Kafka Spout, please refer to the example under tests. Thank you! 
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/pom.xml b/external/storm-kafka-client/pom.xml
new file mode 100644
index 0000000..6c82b6a
--- /dev/null
+++ b/external/storm-kafka-client/pom.xml
@@ -0,0 +1,86 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--/**
+        * Licensed to the Apache Software Foundation (ASF) under one
+        * or more contributor license agreements.  See the NOTICE file
+        * distributed with this work for additional information
+        * regarding copyright ownership.  The ASF licenses this file
+        * to you under the Apache License, Version 2.0 (the
+        * "License"); you may not use this file except in compliance
+        * with the License.  You may obtain a copy of the License at
+        *
+        * http://www.apache.org/licenses/LICENSE-2.0
+        *
+        * Unless required by applicable law or agreed to in writing, software
+        * distributed under the License is distributed on an "AS IS" BASIS,
+        * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+        * See the License for the specific language governing permissions and
+        * limitations under the License.
+        */-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <artifactId>storm</artifactId>
+        <groupId>org.apache.storm</groupId>
+        <version>2.0.0-SNAPSHOT</version>
+        <relativePath>../../pom.xml</relativePath>
+    </parent>
+
+    <artifactId>storm-kafka-client</artifactId>
+    <name>storm-kafka-client</name>
+
+    <packaging>jar</packaging>
+
+    <developers>
+        <developer>
+            <id>hmcl</id>
+            <name>Hugo Louro</name>
+            <email>hmclouro@gmail.com</email>
+        </developer>
+    </developers>
+
+    <dependencies>
+        <!--parent module dependency-->
+        <dependency>
+            <groupId>org.apache.storm</groupId>
+            <artifactId>storm-core</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <!--kafka libraries-->
+        <dependency>
+            <groupId>org.apache.kafka</groupId>
+            <artifactId>kafka-clients</artifactId>
+        </dependency>
+        <!--test dependencies -->
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-all</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>junit</groupId>
+            <artifactId>junit</artifactId>
+            <version>4.11</version>
+            <scope>test</scope>
+        </dependency>
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-jar-plugin</artifactId>
+                <version>2.5</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>test-jar</goal>
+                        </goals>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+</project>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
new file mode 100644
index 0000000..4d67632
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
+    @Override
+    public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
+        final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
+        if (outputFields != null) {
+            if (outputFields.size() == 3) {
+                return new Values(consumerRecord.topic(),
+                        consumerRecord.partition(),
+                        consumerRecord.offset());
+            } else if (outputFields.size() == 5) {
+                return new Values(consumerRecord.topic(),
+                        consumerRecord.partition(),
+                        consumerRecord.offset(),
+                        consumerRecord.key(),
+                        consumerRecord.value());
+            }
+        }
+        throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
new file mode 100644
index 0000000..9a49ee8
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -0,0 +1,503 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.NavigableSet;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaSpout<K, V> extends BaseRichSpout {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+    private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
+
+    // Storm
+    protected SpoutOutputCollector collector;
+
+    // Kafka
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    private transient KafkaConsumer<K, V> kafkaConsumer;
+    private transient boolean consumerAutoCommitMode;
+
+
+    // Bookkeeping
+    private KafkaSpoutStreams kafkaSpoutStreams;
+    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
+    private transient Timer commitTimer;                                    // timer == null for auto commit mode
+    private transient Timer logTimer;
+    private transient Map<TopicPartition, OffsetEntry> acked;         // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
+    private transient int maxRetries;                                 // Max number of times a tuple is retried
+    private transient boolean initialized;          // Flag indicating that the spout is still undergoing initialization process.
+    // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
+    private transient long numUncommittedOffsets;   // Number of offsets that have been polled and emitted but not yet been committed
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private transient PollStrategy pollStrategy;
+
+
+    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
+        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
+        this.tupleBuilder = tupleBuilder;
+    }
+
+    @Override
+    public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+        initialized = false;
+
+        // Spout internals
+        this.collector = collector;
+        maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
+        numUncommittedOffsets = 0;
+        logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
+
+        // Offset management
+        firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+        pollStrategy = kafkaSpoutConfig.getPollStrategy();
+        consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
+
+        if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
+            commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
+            acked = new HashMap<>();
+        }
+
+        LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
+    }
+
+    // =========== Consumer Rebalance Listener - On the same thread as the caller ===========
+
+    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            LOG.debug("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            if (!consumerAutoCommitMode && initialized) {
+                initialized = false;
+                commitOffsetsForAckedTuples();
+            }
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            LOG.debug("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+
+            initialize(partitions);
+        }
+
+        private void initialize(Collection<TopicPartition> partitions) {
+            if (!consumerAutoCommitMode) {
+                acked.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
+            }
+
+            for (TopicPartition tp : partitions) {
+                final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
+                final long fetchOffset = doSeek(tp, committedOffset);
+                setAcked(tp, fetchOffset);
+            }
+            initialized = true;
+            LOG.debug("Initialization complete");
+        }
+
+        /**
+         * sets the cursor to the location dictated by the first poll strategy and returns the fetch offset
+         */
+        private long doSeek(TopicPartition tp, OffsetAndMetadata committedOffset) {
+            long fetchOffset;
+            if (committedOffset != null) {             // offset was committed for this TopicPartition
+                if (firstPollOffsetStrategy.equals(EARLIEST)) {
+                    kafkaConsumer.seekToBeginning(tp);
+                    fetchOffset = kafkaConsumer.position(tp);
+                } else if (firstPollOffsetStrategy.equals(LATEST)) {
+                    kafkaConsumer.seekToEnd(tp);
+                    fetchOffset = kafkaConsumer.position(tp);
+                } else {
+                    // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
+                    fetchOffset = committedOffset.offset() + 1;
+                    kafkaConsumer.seek(tp, fetchOffset);
+                }
+            } else {    // no commits have ever been done, so start at the beginning or end depending on the strategy
+                if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
+                    kafkaConsumer.seekToBeginning(tp);
+                } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
+                    kafkaConsumer.seekToEnd(tp);
+                }
+                fetchOffset = kafkaConsumer.position(tp);
+            }
+            return fetchOffset;
+        }
+    }
+
+    private void setAcked(TopicPartition tp, long fetchOffset) {
+        // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
+        if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
+            acked.put(tp, new OffsetEntry(tp, fetchOffset));
+        }
+    }
+
+    // ======== Next Tuple =======
+
+    @Override
+    public void nextTuple() {
+        if (initialized) {
+            if (commit()) {
+                commitOffsetsForAckedTuples();
+            } else if (poll()) {
+                emitTuples(pollKafkaBroker());
+            } else if (logTimer.isExpiredResetOnTrue()) {   // to limit the number of messages that get printed.
+                log();
+            }
+        } else {
+            LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
+        }
+    }
+
+    private void log() {
+        switch(pollStrategy) {
+            case STREAM:
+                LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
+                        "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
+                        numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
+                break;
+            case BATCH:
+                LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
+                break;
+            default:
+                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+        }
+
+    }
+
+    // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
+    private boolean poll()  {
+        switch(pollStrategy) {
+            case STREAM:
+                return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
+            case BATCH:
+                return consumerAutoCommitMode || numUncommittedOffsets <= 0;
+            default:
+                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
+        }
+    }
+
+    private boolean commit() {
+        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    }
+
+    private ConsumerRecords<K, V> pollKafkaBroker() {
+        final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
+        final int numPolledRecords = consumerRecords.count();
+        numUncommittedOffsets+= numPolledRecords;
+        LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
+        return consumerRecords;
+    }
+
+    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
+        for (TopicPartition tp : consumerRecords.partitions()) {
+            final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+
+            for (final ConsumerRecord<K, V> record : records) {
+                final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
+                final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+
+                kafkaSpoutStreams.emit(collector, messageId);           // emits one tuple per record
+                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+            }
+        }
+    }
+
+    private void commitOffsetsForAckedTuples() {
+        // Find offsets that are ready to be committed for every topic partition
+        final Map<TopicPartition, OffsetAndMetadata> nextCommitOffsets = new HashMap<>();
+        for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+            final OffsetAndMetadata nextCommitOffset = tpOffset.getValue().findNextCommitOffset();
+            if (nextCommitOffset != null) {
+                nextCommitOffsets.put(tpOffset.getKey(), nextCommitOffset);
+            }
+        }
+
+        // Commit offsets that are ready to be committed for every topic partition
+        if (!nextCommitOffsets.isEmpty()) {
+            kafkaConsumer.commitSync(nextCommitOffsets);
+            LOG.debug("Offsets successfully committed to Kafka [{}]", nextCommitOffsets);
+            // Instead of iterating again, it would be possible to commit and update the state for each TopicPartition
+            // in the prior loop, but the multiple network calls should be more expensive than iterating twice over a small loop
+            for (Map.Entry<TopicPartition, OffsetEntry> tpOffset : acked.entrySet()) {
+                final OffsetEntry offsetEntry = tpOffset.getValue();
+                offsetEntry.commit(nextCommitOffsets.get(tpOffset.getKey()));
+            }
+        } else {
+            LOG.trace("No offsets to commit. {}", this);
+        }
+    }
+
+    // ======== Ack =======
+
+    @Override
+    public void ack(Object messageId) {
+        if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
+            final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+            acked.get(msgId.getTopicPartition()).add(msgId);
+            LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
+        }
+    }
+
+    // ======== Fail =======
+
+    @Override
+    public void fail(Object messageId) {
+        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        if (msgId.numFails() < maxRetries) {
+            msgId.incrementNumFails();
+            kafkaSpoutStreams.emit(collector, msgId);
+            LOG.trace("Retried tuple with message id [{}]", msgId);
+        } else { // limit to max number of retries
+            LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
+            ack(msgId);
+        }
+    }
+
+    // ======== Activate / Deactivate / Close / Declare Outputs =======
+
+    @Override
+    public void activate() {
+        subscribeKafkaConsumer();
+    }
+
+    private void subscribeKafkaConsumer() {
+        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
+                kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
+        kafkaConsumer.subscribe(kafkaSpoutConfig.getSubscribedTopics(), new KafkaSpoutConsumerRebalanceListener());
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+        kafkaConsumer.poll(0);
+    }
+
+    @Override
+    public void deactivate() {
+        shutdown();
+    }
+
+    @Override
+    public void close() {
+        shutdown();
+    }
+
+    private void shutdown() {
+        try {
+            kafkaConsumer.wakeup();
+            if (!consumerAutoCommitMode) {
+                commitOffsetsForAckedTuples();
+            }
+        } finally {
+            //remove resources
+            kafkaConsumer.close();
+        }
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+        kafkaSpoutStreams.declareOutputFields(declarer);
+    }
+
+    @Override
+    public String toString() {
+        return "{acked=" + acked + "} ";
+    }
+
+    // ======= Offsets Commit Management ==========
+
+    private static class OffsetComparator implements Comparator<KafkaSpoutMessageId> {
+        public int compare(KafkaSpoutMessageId m1, KafkaSpoutMessageId m2) {
+            return m1.offset() < m2.offset() ? -1 : m1.offset() == m2.offset() ? 0 : 1;
+        }
+    }
+
+    /**
+     * This class is not thread safe
+     */
+    private class OffsetEntry {
+        private final TopicPartition tp;
+        private final long initialFetchOffset;  /* First offset to be fetched. It is either set to the beginning, end, or to the first uncommitted offset.
+                                                 * Initial value depends on offset strategy. See KafkaSpoutConsumerRebalanceListener */
+        private long committedOffset;     // last offset committed to Kafka. Initially it is set to fetchOffset - 1
+        private final NavigableSet<KafkaSpoutMessageId> ackedMsgs = new TreeSet<>(OFFSET_COMPARATOR);     // acked messages sorted by ascending order of offset
+
+        public OffsetEntry(TopicPartition tp, long initialFetchOffset) {
+            this.tp = tp;
+            this.initialFetchOffset = initialFetchOffset;
+            this.committedOffset = initialFetchOffset - 1;
+            LOG.debug("Created OffsetEntry. {}", this);
+        }
+
+        public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
+            ackedMsgs.add(msgId);
+        }
+
+        /**
+         * @return the next OffsetAndMetadata to commit, or null if no offset is ready to commit.
+         */
+        public OffsetAndMetadata findNextCommitOffset() {
+            boolean found = false;
+            long currOffset;
+            long nextCommitOffset = committedOffset;
+            KafkaSpoutMessageId nextCommitMsg = null;     // this is a convenience variable to make it faster to create OffsetAndMetadata
+
+            for (KafkaSpoutMessageId currAckedMsg : ackedMsgs) {  // complexity is that of a linear scan on a TreeMap
+                if ((currOffset = currAckedMsg.offset()) == initialFetchOffset || currOffset == nextCommitOffset + 1) {            // found the next offset to commit
+                    found = true;
+                    nextCommitMsg = currAckedMsg;
+                    nextCommitOffset = currOffset;
+                    LOG.trace("Found offset to commit [{}]. {}", currOffset, this);
+                } else if (currAckedMsg.offset() > nextCommitOffset + 1) {    // offset found is not continuous to the offsets listed to go in the next commit, so stop search
+                    LOG.debug("Non continuous offset found [{}]. It will be processed in a subsequent batch. {}", currOffset, this);
+                    break;
+                } else {
+                    LOG.debug("Unexpected offset found [{}]. {}", currOffset, this);
+                    break;
+                }
+            }
+
+            OffsetAndMetadata nextCommitOffsetAndMetadata = null;
+            if (found) {
+                nextCommitOffsetAndMetadata = new OffsetAndMetadata(nextCommitOffset, nextCommitMsg.getMetadata(Thread.currentThread()));
+                LOG.debug("Offset to be committed next: [{}] {}", nextCommitOffsetAndMetadata.offset(), this);
+            } else {
+                LOG.debug("No offsets ready to commit. {}", this);
+            }
+            return nextCommitOffsetAndMetadata;
+        }
+
+        /**
+         * Marks an offset has committed. This method has side effects - it sets the internal state in such a way that future
+         * calls to {@link #findNextCommitOffset()} will return offsets greater than the offset specified, if any.
+         *
+         * @param committedOffset offset to be marked as committed
+         */
+        public void commit(OffsetAndMetadata committedOffset) {
+            if (committedOffset != null) {
+                final long numCommittedOffsets = committedOffset.offset() - this.committedOffset;
+                this.committedOffset = committedOffset.offset();
+                for (Iterator<KafkaSpoutMessageId> iterator = ackedMsgs.iterator(); iterator.hasNext(); ) {
+                    if (iterator.next().offset() <= committedOffset.offset()) {
+                        iterator.remove();
+                    } else {
+                        break;
+                    }
+                }
+                numUncommittedOffsets-= numCommittedOffsets;
+            }
+            LOG.trace("Object state after update: {}, numUncommittedOffsets [{}]", this, numUncommittedOffsets);
+        }
+
+        public boolean isEmpty() {
+            return ackedMsgs.isEmpty();
+        }
+
+        @Override
+        public String toString() {
+            return "OffsetEntry{" +
+                    "topic-partition=" + tp +
+                    ", fetchOffset=" + initialFetchOffset +
+                    ", committedOffset=" + committedOffset +
+                    ", ackedMsgs=" + ackedMsgs +
+                    '}';
+        }
+    }
+
+    // =========== Timer ===========
+
+    private class Timer {
+        private final long delay;
+        private final long period;
+        private final TimeUnit timeUnit;
+        private final long periodNanos;
+        private long start;
+
+        /**
+         * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
+         * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
+         * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
+         *
+         * @param delay    the initial delay before the timer starts
+         * @param period   the period between calls {@link #isExpiredResetOnTrue()}
+         * @param timeUnit the time unit of delay and period
+         */
+        public Timer(long delay, long period, TimeUnit timeUnit) {
+            this.delay = delay;
+            this.period = period;
+            this.timeUnit = timeUnit;
+
+            periodNanos = timeUnit.toNanos(period);
+            start = System.nanoTime() + timeUnit.toNanos(delay);
+        }
+
+        public long period() {
+            return period;
+        }
+
+        public long delay() {
+            return delay;
+        }
+
+        public TimeUnit getTimeUnit() {
+            return timeUnit;
+        }
+
+        /**
+         * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
+         * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
+         * (re-initiated) and a new cycle will start.
+         *
+         * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
+         * otherwise.
+         */
+        public boolean isExpiredResetOnTrue() {
+            final boolean expired = System.nanoTime() - start > periodNanos;
+            if (expired) {
+                start = System.nanoTime();
+            }
+            return expired;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
new file mode 100644
index 0000000..d969f1f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -0,0 +1,298 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.serialization.Deserializer;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+ */
+public class KafkaSpoutConfig<K, V> implements Serializable {
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 2_000;            // 2s
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 15_000;   // 15s
+    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000;    // 10,000 records
+
+    // Kafka property names
+    public interface Consumer {
+        String GROUP_ID = "group.id";
+        String BOOTSTRAP_SERVERS = "bootstrap.servers";
+        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
+        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
+        String KEY_DESERIALIZER = "key.deserializer";
+        String VALUE_DESERIALIZER = "value.deserializer";
+    }
+
+    /**
+     * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
+     * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
+     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
+     * <ul>
+     * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
+     * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
+     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
+     * If no offset has been committed, it behaves as EARLIEST.</li>
+     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
+     * If no offset has been committed, it behaves as LATEST.</li>
+     * </ul>
+     * */
+    public enum FirstPollOffsetStrategy {
+        EARLIEST,
+        LATEST,
+        UNCOMMITTED_EARLIEST,
+        UNCOMMITTED_LATEST }
+
+    /**
+     * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
+     * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
+     * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
+     * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
+     * <ul>
+     *     <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
+     *     threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
+     *     committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
+     *     </li>
+     *     <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
+     * </ul>
+     */
+    public enum PollStrategy {
+        STREAM,
+        BATCH
+    }
+
+    // Kafka consumer configuration
+    private final Map<String, Object> kafkaProps;
+    private final Deserializer<K> keyDeserializer;
+    private final Deserializer<V> valueDeserializer;
+    private final long pollTimeoutMs;
+
+    // Kafka spout configuration
+    private final long offsetCommitPeriodMs;
+    private final int maxRetries;
+    private final int maxUncommittedOffsets;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final PollStrategy pollStrategy;
+    private final KafkaSpoutStreams kafkaSpoutStreams;
+
+    private KafkaSpoutConfig(Builder<K,V> builder) {
+        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+        this.keyDeserializer = builder.keyDeserializer;
+        this.valueDeserializer = builder.valueDeserializer;
+        this.pollTimeoutMs = builder.pollTimeoutMs;
+        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+        this.maxRetries = builder.maxRetries;
+        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+        this.pollStrategy = builder.pollStrategy;
+        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
+        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+    }
+
+    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
+        // set defaults for properties not specified
+        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
+            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+        }
+        return kafkaProps;
+    }
+
+    public static class Builder<K,V> {
+        private Map<String, Object> kafkaProps;
+        private Deserializer<K> keyDeserializer;
+        private Deserializer<V> valueDeserializer;
+        private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
+        private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
+        private int maxRetries = DEFAULT_MAX_RETRIES;
+        private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+        private KafkaSpoutStreams kafkaSpoutStreams;
+        private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
+        private PollStrategy pollStrategy = PollStrategy.STREAM;
+
+        /***
+         * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
+         * The optional configuration can be specified using the set methods of this builder
+         * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
+         * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+         */
+        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+            if (kafkaProps == null || kafkaProps.isEmpty()) {
+                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+            }
+
+            if (kafkaSpoutStreams == null)  {
+                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+            }
+            this.kafkaProps = kafkaProps;
+            this.kafkaSpoutStreams = kafkaSpoutStreams;
+        }
+
+        /**
+         * Specifying this key deserializer overrides the property key.deserializer
+         */
+        public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
+            this.keyDeserializer = keyDeserializer;
+            return this;
+        }
+
+        /**
+         * Specifying this value deserializer overrides the property value.deserializer
+         */
+        public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
+            this.valueDeserializer = valueDeserializer;
+            return this;
+        }
+
+        /**
+         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
+         * @param pollTimeoutMs time in ms
+         */
+        public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
+            this.pollTimeoutMs = pollTimeoutMs;
+            return this;
+        }
+
+        /**
+         * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+         * @param offsetCommitPeriodMs time in ms
+         */
+        public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+            this.offsetCommitPeriodMs = offsetCommitPeriodMs;
+            return this;
+        }
+
+        /**
+         * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
+         * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
+         * all the previously polled records.
+         * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
+         * polled records in favor of processing more records.
+         * @param maxRetries max number of retrials
+         */
+        public Builder<K,V> setMaxRetries(int maxRetries) {
+            this.maxRetries = maxRetries;
+            return this;
+        }
+
+        /**
+         * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
+         * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
+         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * @param maxUncommittedOffsets max number of records that can be be pending commit
+         */
+        public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+            this.maxUncommittedOffsets = maxUncommittedOffsets;
+            return this;
+        }
+
+        /**
+         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
+         * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+         * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
+         * */
+        public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
+            this.firstPollOffsetStrategy = firstPollOffsetStrategy;
+            return this;
+        }
+
+        /**
+         * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
+         * Please refer to to the documentation in {@link PollStrategy}
+         * @param pollStrategy strategy used to decide when to poll
+         * */
+        public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
+            this.pollStrategy = pollStrategy;
+            return this;
+        }
+
+        public KafkaSpoutConfig<K,V> build() {
+            return new KafkaSpoutConfig<>(this);
+        }
+    }
+
+    public Map<String, Object> getKafkaProps() {
+        return kafkaProps;
+    }
+
+    public Deserializer<K> getKeyDeserializer() {
+        return keyDeserializer;
+    }
+
+    public Deserializer<V> getValueDeserializer() {
+        return valueDeserializer;
+    }
+
+    public long getPollTimeoutMs() {
+        return pollTimeoutMs;
+    }
+
+    public long getOffsetsCommitPeriodMs() {
+        return offsetCommitPeriodMs;
+    }
+
+    public boolean isConsumerAutoCommitMode() {
+        return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null     // default is true
+                || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+    }
+
+    public String getConsumerGroupId() {
+        return (String) kafkaProps.get(Consumer.GROUP_ID);
+    }
+
+    public List<String> getSubscribedTopics() {
+        return new ArrayList<>(kafkaSpoutStreams.getTopics());
+    }
+
+    public int getMaxTupleRetries() {
+        return maxRetries;
+    }
+
+    public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+        return firstPollOffsetStrategy;
+    }
+
+    public KafkaSpoutStreams getKafkaSpoutStreams() {
+        return kafkaSpoutStreams;
+    }
+
+    public int getMaxUncommittedOffsets() {
+        return maxUncommittedOffsets;
+    }
+
+    public PollStrategy getPollStrategy() {
+        return pollStrategy;
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutConfig{" +
+                "kafkaProps=" + kafkaProps +
+                ", keyDeserializer=" + keyDeserializer +
+                ", valueDeserializer=" + valueDeserializer +
+                ", topics=" + getSubscribedTopics() +
+                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
+                ", pollTimeoutMs=" + pollTimeoutMs +
+                ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
+                ", maxRetries=" + maxRetries +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
new file mode 100644
index 0000000..0a6b126
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collections;
+import java.util.List;
+
+public class KafkaSpoutMessageId {
+    private transient TopicPartition topicPart;
+    private transient long offset;
+    private transient List<Object> tuple;
+    private transient int numFails = 0;
+
+    public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
+        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+    }
+
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+        this.topicPart = topicPart;
+        this.offset = offset;
+        this.tuple = tuple;
+    }
+
+    public int partition() {
+        return topicPart.partition();
+    }
+
+    public String topic() {
+        return topicPart.topic();
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public int numFails() {
+        return numFails;
+    }
+
+    public void incrementNumFails() {
+        ++numFails;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPart;
+    }
+
+    public List<Object> getTuple() {
+        return Collections.unmodifiableList(tuple);
+    }
+
+    public String getMetadata(Thread currThread) {
+        return "{" +
+                "topic-partition=" + topicPart +
+                ", offset=" + offset +
+                ", numFails=" + numFails +
+                ", thread='" + currThread.getName() + "'" +
+                '}';
+    }
+
+    @Override
+    public String toString() {
+        return "{" +
+                "topic-partition=" + topicPart +
+                ", offset=" + offset +
+                ", numFails=" + numFails +
+                ", tuple=" + tuple +
+                '}';
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        KafkaSpoutMessageId messageId = (KafkaSpoutMessageId) o;
+        if (offset != messageId.offset) {
+            return false;
+        }
+        return topicPart.equals(messageId.topicPart);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = topicPart.hashCode();
+        result = 31 * result + (int) (offset ^ (offset >>> 32));
+        return result;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
new file mode 100644
index 0000000..43464a9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+
+import java.io.Serializable;
+
+/**
+ * Represents the stream and output fields used by a topic
+ */
+public class KafkaSpoutStream implements Serializable {
+    private final Fields outputFields;
+    private final String streamId;
+    private final String topic;
+
+    /** Declare specified outputFields with default stream for the specified topic */
+    KafkaSpoutStream(Fields outputFields, String topic) {
+        this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
+    }
+
+    /** Declare specified outputFields with specified stream for the specified topic */
+    KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+        this.outputFields = outputFields;
+        this.streamId = streamId;
+        this.topic = topic;
+    }
+
+    public Fields getOutputFields() {
+        return outputFields;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public String getTopic() {
+        return topic;
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutStream{" +
+                "outputFields=" + outputFields +
+                ", streamId='" + streamId + '\'' +
+                ", topic='" + topic + '\'' +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
new file mode 100644
index 0000000..30215d1
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -0,0 +1,162 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.OutputFieldsGetter;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.utils.Utils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Represents the output streams associated with each topic, and provides a public API to
+ * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
+ */
+public class KafkaSpoutStreams implements Serializable {
+    protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreams.class);
+
+    private final Map<String, KafkaSpoutStream> topicToStream;
+
+    private KafkaSpoutStreams(Builder builder) {
+        this.topicToStream = builder.topicToStream;
+        LOG.debug("Built {}", this);
+    }
+
+    /**
+     * @param topic the topic for which to get output fields
+     * @return the output fields declared
+     */
+    public Fields getOutputFields(String topic) {
+        if (topicToStream.containsKey(topic)) {
+            final Fields outputFields = topicToStream.get(topic).getOutputFields();
+            LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields);
+            return outputFields;
+        }
+        throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+    }
+
+    /**
+     * @param topic the topic to for which to get the stream id
+     * @return the id of the stream to where the tuples are emitted
+     */
+    public String getStreamId(String topic) {
+        if (topicToStream.containsKey(topic)) {
+            final String streamId = topicToStream.get(topic).getStreamId();
+            LOG.trace("Topic [{}] emitting in stream [{}]", topic, streamId);
+            return streamId;
+        }
+        throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
+    }
+
+    /**
+     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+     */
+    public List<String> getTopics() {
+        return new ArrayList<>(topicToStream.keySet());
+    }
+
+    void declareOutputFields(OutputFieldsDeclarer declarer) {
+        for (KafkaSpoutStream stream : topicToStream.values()) {
+            if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
+                declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
+                LOG.debug("Declared " + stream);
+            }
+        }
+    }
+
+    void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
+        collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutStreams{" +
+                "topicToStream=" + topicToStream +
+                '}';
+    }
+
+    public static class Builder {
+        private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
+
+        /**
+         * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+         * All the topics will have the same stream id and output fields.
+         */
+        public Builder(Fields outputFields, String... topics) {
+            this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+        }
+
+        /**
+         * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
+         * All the topics will have the same stream id and output fields.
+         */
+        public Builder (Fields outputFields, String streamId, String... topics) {
+            for (String topic : topics) {
+                topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+            }
+        }
+
+        /**
+         * Adds this stream to the state representing the streams associated with each topic
+         */
+        public Builder(KafkaSpoutStream stream) {
+            topicToStream.put(stream.getTopic(), stream);
+        }
+
+        /**
+         * Adds this stream to the state representing the streams associated with each topic
+         */
+        public Builder addStream(KafkaSpoutStream stream) {
+            topicToStream.put(stream.getTopic(), stream);
+            return this;
+        }
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Fields, String...)}
+         */
+        public Builder addStream(Fields outputFields, String... topics) {
+            for (String topic : topics) {
+                topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
+            }
+            return this;
+        }
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Fields, String, String...)}
+         */
+        public Builder addStream(Fields outputFields, String streamId, String... topics) {
+            for (String topic : topics) {
+                topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
+            }
+            return this;
+        }
+
+        public KafkaSpoutStreams build() {
+            return new KafkaSpoutStreams(this);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
new file mode 100644
index 0000000..45aab48
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+
+import java.io.Serializable;
+import java.util.List;
+
+public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
+    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
new file mode 100644
index 0000000..4fcc3ef
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.Config;
+import org.apache.storm.LocalCluster;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutStreams;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.topology.TopologyBuilder;
+import org.apache.storm.tuple.Fields;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
+
+public class KafkaSpoutTopologyMain {
+    private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
+    private static final String[] TOPICS = new String[]{"test","test1","test2"};
+
+
+    public static void main(String[] args) throws Exception {
+        if (args.length == 0) {
+            submitTopologyLocalCluster(getTopolgyKafkaSpout(), getConfig());
+        } else {
+            submitTopologyRemoteCluster(args[0], getTopolgyKafkaSpout(), getConfig());
+        }
+    }
+
+    protected static void submitTopologyLocalCluster(StormTopology topology, Config config) throws InterruptedException {
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("test", config, topology);
+        stopWaitingForInput();
+    }
+
+    protected static void submitTopologyRemoteCluster(String arg, StormTopology topology, Config config) throws Exception {
+        StormSubmitter.submitTopology(arg, config, topology);
+    }
+
+    private static void stopWaitingForInput() {
+        try {
+            System.out.println("PRESS ENTER TO STOP");
+            new BufferedReader(new InputStreamReader(System.in)).readLine();
+            System.exit(0);
+        } catch (IOException e) {
+            e.printStackTrace();
+        }
+    }
+
+    protected static Config getConfig() {
+        Config config = new Config();
+        config.setDebug(true);
+        return config;
+    }
+
+    public static StormTopology getTopolgyKafkaSpout() {
+        final TopologyBuilder tp = new TopologyBuilder();
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
+        tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+        tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        return tp.createTopology();
+    }
+
+    public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
+        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+                .setOffsetCommitPeriodMs(10_000)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setPollStrategy(STREAM)
+                .setMaxUncommittedOffsets(250)
+                .build();
+    }
+
+    public static Map<String,Object> getKafkaConsumerProps() {
+        Map<String, Object> props = new HashMap<>();
+//        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
+        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:9092");
+        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
+        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
+        return props;
+    }
+
+    public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
+        return new KafkaRecordTupleBuilder<>();
+    }
+
+    public static KafkaSpoutStreams getKafkaSpoutStreams() {
+        final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
+        final Fields outputFields1 = new Fields("topic", "partition", "offset");
+        return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to test_stream
+                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream
+                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream2
+                .build();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
new file mode 100644
index 0000000..c9ff9d5
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaTestBolt extends BaseRichBolt {
+    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
+
+
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.debug("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-kafka/pom.xml b/external/storm-kafka/pom.xml
index 43b7796..763c15f 100644
--- a/external/storm-kafka/pom.xml
+++ b/external/storm-kafka/pom.xml
@@ -118,26 +118,12 @@
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
-            <artifactId>kafka_2.10</artifactId>
-            <version>0.8.2.1</version>
-            <!-- use provided scope, so users can pull in whichever scala version they choose -->
+            <artifactId>${kafka.artifact.id}</artifactId>
             <scope>provided</scope>
-            <exclusions>
-                <exclusion>
-                    <groupId>org.apache.zookeeper</groupId>
-                    <artifactId>zookeeper</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>log4j</groupId>
-                    <artifactId>log4j</artifactId>
-                </exclusion>
-            </exclusions>
         </dependency>
         <dependency>
             <groupId>org.apache.kafka</groupId>
             <artifactId>kafka-clients</artifactId>
-            <version>0.8.2.1</version>
-            <scope>provided</scope>
         </dependency>
         <dependency>
             <groupId>org.apache.storm</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/external/storm-solr/pom.xml
----------------------------------------------------------------------
diff --git a/external/storm-solr/pom.xml b/external/storm-solr/pom.xml
index d093ae8..ba79ddc 100644
--- a/external/storm-solr/pom.xml
+++ b/external/storm-solr/pom.xml
@@ -31,10 +31,10 @@
 
     <developers>
         <developer>
-            <id>Hugo-Louro</id>
-            <name>Hugo Louro</name>
-            <email>hmclouro@gmail.com</email>
-        </developer>
+        <id>hmcl</id>
+        <name>Hugo Louro</name>
+        <email>hmclouro@gmail.com</email>
+    </developer>
     </developers>
 
     <dependencies>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index 12e5a9f..1a899b3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -82,7 +82,7 @@
             <roles>
                 <role>Committer</role>
             </roles>
-            <timezone />
+            <timezone/>
         </developer>
         <developer>
             <id>afeng</id>
@@ -109,7 +109,7 @@
             <roles>
                 <role>Committer</role>
             </roles>
-            <timezone />
+            <timezone/>
         </developer>
         <developer>
             <id>jjackson</id>
@@ -249,6 +249,8 @@
         <calcite.version>1.4.0-incubating</calcite.version>
         <jackson.version>2.6.3</jackson.version>
         <maven-surefire.version>2.18.1</maven-surefire.version>
+        <kafka.version>0.9.0.1</kafka.version>
+        <kafka.artifact.id>kafka_2.11</kafka.artifact.id>
 
         <!-- Java and clojure build lifecycle test properties are defined here to avoid having to create a default profile -->
         <java.unit.test.exclude>org.apache.storm.testing.IntegrationTest</java.unit.test.exclude>
@@ -282,6 +284,7 @@
         <module>external/storm-mongodb</module>
         <module>examples/storm-starter</module>
         <module>storm-clojure</module>
+        <module>external/storm-kafka-client</module>
     </modules>
 
     <dependencies>
@@ -673,14 +676,14 @@
                 <version>${ring-json.version}</version>
             </dependency>
             <dependency>
-	      <groupId>org.eclipse.jetty</groupId>
-	      <artifactId>jetty-servlet</artifactId>
-	      <version>${jetty.version}</version>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-servlet</artifactId>
+                <version>${jetty.version}</version>
             </dependency>
             <dependency>
-	      <groupId>org.eclipse.jetty</groupId>
-	      <artifactId>jetty-servlets</artifactId>
-	      <version>${jetty.version}</version>
+                <groupId>org.eclipse.jetty</groupId>
+                <artifactId>jetty-servlets</artifactId>
+                <version>${jetty.version}</version>
             </dependency>
             <dependency>
                 <groupId>org.eclipse.jetty</groupId>
@@ -831,7 +834,7 @@
                 <version>${thrift.version}</version>
                 <scope>compile</scope>
             </dependency>
-			<!-- used by examples/storm-starter -->
+            <!-- used by examples/storm-starter -->
             <dependency>
                 <groupId>junit</groupId>
                 <artifactId>junit</artifactId>
@@ -839,14 +842,38 @@
                 <scope>test</scope>
             </dependency>
             <dependency>
-              <groupId>org.apache.calcite</groupId>
-              <artifactId>calcite-core</artifactId>
-              <version>${calcite.version}</version>
+                <groupId>org.apache.calcite</groupId>
+                <artifactId>calcite-core</artifactId>
+                <version>${calcite.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>com.fasterxml.jackson.core</groupId>
+                <artifactId>jackson-databind</artifactId>
+                <version>${jackson.version}</version>
+            </dependency>
+            <dependency>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>${kafka.artifact.id}</artifactId>
+                <version>${kafka.version}</version>
+                <exclusions>
+                    <exclusion>
+                        <groupId>org.apache.zookeeper</groupId>
+                        <artifactId>zookeeper</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>log4j</groupId>
+                        <artifactId>log4j</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-log4j12</artifactId>
+                    </exclusion>
+                </exclusions>
             </dependency>
             <dependency>
-              <groupId>com.fasterxml.jackson.core</groupId>
-              <artifactId>jackson-databind</artifactId>
-              <version>${jackson.version}</version>
+                <groupId>org.apache.kafka</groupId>
+                <artifactId>kafka-clients</artifactId>
+                <version>${kafka.version}</version>
             </dependency>
             <dependency>
                 <groupId>uk.org.lidalia</groupId>

http://git-wip-us.apache.org/repos/asf/storm/blob/d26b984d/storm-dist/binary/src/main/assembly/binary.xml
----------------------------------------------------------------------
diff --git a/storm-dist/binary/src/main/assembly/binary.xml b/storm-dist/binary/src/main/assembly/binary.xml
index 7f0da6f..648640e 100644
--- a/storm-dist/binary/src/main/assembly/binary.xml
+++ b/storm-dist/binary/src/main/assembly/binary.xml
@@ -317,6 +317,20 @@
                 <include>README.*</include>
             </includes>
         </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-kafka-client/target</directory>
+            <outputDirectory>external/storm-kafka-client</outputDirectory>
+            <includes>
+                <include>storm*jar</include>
+            </includes>
+        </fileSet>
+        <fileSet>
+            <directory>${project.basedir}/../../external/storm-kafka-client</directory>
+            <outputDirectory>external/storm-kafka-client</outputDirectory>
+            <includes>
+                <include>README.*</include>
+            </includes>
+        </fileSet>
 
         <!-- $STORM_HOME/extlib -->
         <fileSet>


[2/3] storm git commit: STORM-822: Kafka Spout New Consumer API - Refactored code to avoid keeping records data inside spout state - Refactored code to specify output fields per stream and build tuples per topic - Implement exponential backoff retry s

Posted by bo...@apache.org.
STORM-822: Kafka Spout New Consumer API
 - Refactored code to avoid keeping records data inside spout state
 - Refactored code to specify output fields per stream and build tuples per topic
 - Implement exponential backoff retry strategy
 - Send one tuple per call to nextTuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/332afc40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/332afc40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/332afc40

Branch: refs/heads/master
Commit: 332afc40d8ee7c1f4a4a747280ff83a92c279c5d
Parents: d26b984
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Mar 21 14:42:50 2016 -0700
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500

----------------------------------------------------------------------
 .../kafka/spout/KafkaRecordTupleBuilder.java    |  44 ---
 .../apache/storm/kafka/spout/KafkaSpout.java    | 160 +++++++----
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  87 +++---
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  16 +-
 .../KafkaSpoutRetryExponentialBackoff.java      | 281 +++++++++++++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     |  72 +++++
 .../storm/kafka/spout/KafkaSpoutStream.java     |  14 +-
 .../storm/kafka/spout/KafkaSpoutStreams.java    |  26 +-
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  34 ++-
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  82 ++++++
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |  50 ++++
 .../spout/test/KafkaSpoutTopologyMain.java      |  37 ++-
 .../storm/kafka/spout/test/KafkaTestBolt.java   |  52 ----
 .../spout/test/TopicTest2TupleBuilder.java      |  40 +++
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 +++
 15 files changed, 798 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
deleted file mode 100644
index 4d67632..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
-    @Override
-    public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
-        final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
-        if (outputFields != null) {
-            if (outputFields.size() == 3) {
-                return new Values(consumerRecord.topic(),
-                        consumerRecord.partition(),
-                        consumerRecord.offset());
-            } else if (outputFields.size() == 5) {
-                return new Values(consumerRecord.topic(),
-                        consumerRecord.partition(),
-                        consumerRecord.offset(),
-                        consumerRecord.key(),
-                        consumerRecord.value());
-            }
-        }
-        throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9a49ee8..d211ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -34,12 +33,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
@@ -62,23 +65,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     // Bookkeeping
-    private KafkaSpoutStreams kafkaSpoutStreams;
-    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
-    private transient Timer commitTimer;                                    // timer == null for auto commit mode
-    private transient Timer logTimer;
-    private transient Map<TopicPartition, OffsetEntry> acked;         // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
-    private transient int maxRetries;                                 // Max number of times a tuple is retried
-    private transient boolean initialized;          // Flag indicating that the spout is still undergoing initialization process.
+    private transient int maxRetries;                                   // Max number of times a tuple is retried
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
+    private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
+    private transient Timer commitTimer;                                // timer == null for auto commit mode
+    private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
-    private transient long numUncommittedOffsets;   // Number of offsets that have been polled and emitted but not yet been committed
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private transient PollStrategy pollStrategy;
 
+    private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps all the logic to declare output fields and emit tuples
+    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // Object that contains the logic to build tuples for each ConsumerRecord
 
-    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+    private transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
+
+
+    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
         this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-        this.tupleBuilder = tupleBuilder;
     }
 
     @Override
@@ -89,18 +94,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         this.collector = collector;
         maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
         numUncommittedOffsets = 0;
-        logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
 
         // Offset management
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        pollStrategy = kafkaSpoutConfig.getPollStrategy();
         consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
 
+        // Retries management
+        retryService = kafkaSpoutConfig.getRetryService();
+
+        // Tuples builder delegate
+        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
             commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
-            acked = new HashMap<>();
         }
 
+        acked = new HashMap<>();
+        emitted = new HashSet<>();
+        waitingToEmit = Collections.emptyListIterator();
+
         LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
     }
 
@@ -130,6 +142,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 acked.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
             }
 
+            retryService.retainAll(partitions);
+
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
@@ -182,67 +196,88 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         if (initialized) {
             if (commit()) {
                 commitOffsetsForAckedTuples();
-            } else if (poll()) {
-                emitTuples(pollKafkaBroker());
-            } else if (logTimer.isExpiredResetOnTrue()) {   // to limit the number of messages that get printed.
-                log();
+            }
+
+            if (poll()) {
+                setWaitingToEmit(pollKafkaBroker());
+            }
+
+            if (waitingToEmit()) {
+                emit();
             }
         } else {
             LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
         }
     }
 
-    private void log() {
-        switch(pollStrategy) {
-            case STREAM:
-                LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
-                        "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
-                        numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
-                break;
-            case BATCH:
-                LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
-                break;
-            default:
-                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
-        }
+    private boolean commit() {
+        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    }
 
+    private boolean poll() {
+        return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
     }
 
-    // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
-    private boolean poll()  {
-        switch(pollStrategy) {
-            case STREAM:
-                return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
-            case BATCH:
-                return consumerAutoCommitMode || numUncommittedOffsets <= 0;
-            default:
-                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
-        }
+    private boolean waitingToEmit() {
+        return waitingToEmit != null && waitingToEmit.hasNext();
     }
 
-    private boolean commit() {
-        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
+        List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+        for (TopicPartition tp : consumerRecords.partitions()) {
+            waitingToEmitList.addAll(consumerRecords.records(tp));
+        }
+        waitingToEmit = waitingToEmitList.iterator();
+        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
     }
 
+    // ======== poll =========
     private ConsumerRecords<K, V> pollKafkaBroker() {
+        doSeekRetriableTopicPartitions();
+
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
-        numUncommittedOffsets+= numPolledRecords;
         LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
     }
 
-    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
-        for (TopicPartition tp : consumerRecords.partitions()) {
-            final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+    private void doSeekRetriableTopicPartitions() {
+        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+
+        for (TopicPartition rtp : retriableTopicPartitions) {
+            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
+            if (offsetAndMeta != null) {
+                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
+            } else {
+                kafkaConsumer.seekToEnd(rtp);    // Seek to last committed offset
+            }
+        }
+    }
 
-            for (final ConsumerRecord<K, V> record : records) {
-                final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
-                final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+    // ======== emit  =========
+    private void emit() {
+        emitTupleIfNotEmitted(waitingToEmit.next());
+        waitingToEmit.remove();
+    }
 
-                kafkaSpoutStreams.emit(collector, messageId);           // emits one tuple per record
-                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+    // emits one tuple per record
+    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+        final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+
+        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
+            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
+        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
+            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
+        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+            final List<Object> tuple = tuplesBuilder.buildTuple(record);
+            kafkaSpoutStreams.emit(collector, tuple, msgId);
+            emitted.add(msgId);
+            numUncommittedOffsets++;
+            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
+                retryService.remove(msgId);  // re-emitted hence remove from failed
             }
+            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
         }
     }
 
@@ -275,11 +310,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void ack(Object messageId) {
+        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
-            final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
             acked.get(msgId.getTopicPartition()).add(msgId);
             LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
         }
+        emitted.remove(msgId);
     }
 
     // ======== Fail =======
@@ -287,10 +323,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        emitted.remove(msgId);
         if (msgId.numFails() < maxRetries) {
             msgId.incrementNumFails();
-            kafkaSpoutStreams.emit(collector, msgId);
-            LOG.trace("Retried tuple with message id [{}]", msgId);
+            retryService.schedule(msgId);
         } else { // limit to max number of retries
             LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
             ack(msgId);
@@ -367,7 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             this.tp = tp;
             this.initialFetchOffset = initialFetchOffset;
             this.committedOffset = initialFetchOffset - 1;
-            LOG.debug("Created OffsetEntry. {}", this);
+            LOG.debug("Instantiated {}", this);
         }
 
         public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
@@ -434,6 +470,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             return ackedMsgs.isEmpty();
         }
 
+        public boolean contains(ConsumerRecord record) {
+            return contains(new KafkaSpoutMessageId(record));
+        }
+
+        public boolean contains(KafkaSpoutMessageId msgId) {
+            return ackedMsgs.contains(msgId);
+        }
+
         @Override
         public String toString() {
             return "OffsetEntry{" +

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d969f1f..29cedb2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,7 +18,9 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -63,24 +65,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         UNCOMMITTED_EARLIEST,
         UNCOMMITTED_LATEST }
 
-    /**
-     * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
-     * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
-     * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
-     * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
-     * <ul>
-     *     <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
-     *     threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
-     *     committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
-     *     </li>
-     *     <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
-     * </ul>
-     */
-    public enum PollStrategy {
-        STREAM,
-        BATCH
-    }
-
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
     private final Deserializer<K> keyDeserializer;
@@ -92,8 +76,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final int maxRetries;
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final PollStrategy pollStrategy;
     private final KafkaSpoutStreams kafkaSpoutStreams;
+    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+    private final KafkaSpoutRetryService retryService;
 
     private KafkaSpoutConfig(Builder<K,V> builder) {
         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
@@ -103,9 +88,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
         this.maxRetries = builder.maxRetries;
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.pollStrategy = builder.pollStrategy;
         this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+        this.tuplesBuilder = builder.tuplesBuilder;
+        this.retryService = builder.retryService;
     }
 
     private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
@@ -117,33 +103,61 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     public static class Builder<K,V> {
-        private Map<String, Object> kafkaProps;
+        private final Map<String, Object> kafkaProps;
         private Deserializer<K> keyDeserializer;
         private Deserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
         private int maxRetries = DEFAULT_MAX_RETRIES;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-        private KafkaSpoutStreams kafkaSpoutStreams;
+        private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-        private PollStrategy pollStrategy = PollStrategy.STREAM;
+        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+        private final KafkaSpoutRetryService retryService;
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
+         * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
+         * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+         */
+        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
+            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
+                    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+                            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+        }
 
         /***
          * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
          * The optional configuration can be specified using the set methods of this builder
          * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
          * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+         * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
+         * @param retryService  logic that manages the retrial of failed tuples
          */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
             if (kafkaProps == null || kafkaProps.isEmpty()) {
-                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
             }
 
             if (kafkaSpoutStreams == null)  {
-                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
+            }
+
+            if (tuplesBuilder == null) {
+                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
             }
+
+            if (retryService == null) {
+                throw new IllegalArgumentException("Must specify at implementation of retry service");
+            }
+
             this.kafkaProps = kafkaProps;
             this.kafkaSpoutStreams = kafkaSpoutStreams;
+            this.tuplesBuilder = tuplesBuilder;
+            this.retryService = retryService;
         }
 
         /**
@@ -214,16 +228,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        /**
-         * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
-         * Please refer to to the documentation in {@link PollStrategy}
-         * @param pollStrategy strategy used to decide when to poll
-         * */
-        public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
-            this.pollStrategy = pollStrategy;
-            return this;
-        }
-
         public KafkaSpoutConfig<K,V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -258,6 +262,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return (String) kafkaProps.get(Consumer.GROUP_ID);
     }
 
+    /**
+     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+     */
     public List<String> getSubscribedTopics() {
         return new ArrayList<>(kafkaSpoutStreams.getTopics());
     }
@@ -278,8 +285,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return maxUncommittedOffsets;
     }
 
-    public PollStrategy getPollStrategy() {
-        return pollStrategy;
+    public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
+        return tuplesBuilder;
+    }
+
+    public KafkaSpoutRetryService getRetryService() {
+        return retryService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 0a6b126..71f8327 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -21,23 +21,18 @@ package org.apache.storm.kafka.spout;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.Collections;
-import java.util.List;
-
 public class KafkaSpoutMessageId {
     private transient TopicPartition topicPart;
     private transient long offset;
-    private transient List<Object> tuple;
     private transient int numFails = 0;
 
-    public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
-        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+    public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
+        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
     }
 
-    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
         this.topicPart = topicPart;
         this.offset = offset;
-        this.tuple = tuple;
     }
 
     public int partition() {
@@ -64,10 +59,6 @@ public class KafkaSpoutMessageId {
         return topicPart;
     }
 
-    public List<Object> getTuple() {
-        return Collections.unmodifiableList(tuple);
-    }
-
     public String getMetadata(Thread currThread) {
         return "{" +
                 "topic-partition=" + topicPart +
@@ -83,7 +74,6 @@ public class KafkaSpoutMessageId {
                 "topic-partition=" + topicPart +
                 ", offset=" + offset +
                 ", numFails=" + numFails +
-                ", tuple=" + tuple +
                 '}';
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
new file mode 100644
index 0000000..208cef4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ */
+public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
+    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
+
+    private TimeInterval initialDelay;
+    private TimeInterval delayPeriod;
+    private TimeInterval maxDelay;
+    private int maxRetries;
+
+    private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
+
+    /**
+     * Comparator ordering by timestamp 
+     */
+    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
+            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+        }
+    }
+
+    private class RetrySchedule {
+        private KafkaSpoutMessageId msgId;
+        private long nextRetryTimeNanos;
+
+        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+            this.msgId = msgId;
+            this.nextRetryTimeNanos = nextRetryTime;
+            LOG.debug("Created {}", this);
+        }
+
+        public void setNextRetryTime() {
+            nextRetryTimeNanos = nextTime(msgId);
+            LOG.debug("Updated {}", this);
+        }
+
+        public boolean retry(long currentTimeNanos) {
+            return nextRetryTimeNanos <= currentTimeNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "RetrySchedule{" +
+                    "msgId=" + msgId +
+                    ", nextRetryTime=" + nextRetryTimeNanos +
+                    '}';
+        }
+
+        public KafkaSpoutMessageId msgId() {
+            return msgId;
+        }
+
+        public long nextRetryTimeNanos() {
+            return nextRetryTimeNanos;
+        }
+    }
+
+    public static class TimeInterval implements Serializable {
+        private long lengthNanos;
+        private long length;
+        private TimeUnit timeUnit;
+
+        /**
+         * @param length length of the time interval in the units specified by {@link TimeUnit}
+         * @param timeUnit unit used to specify a time interval on which to specify a time unit
+         */
+        public TimeInterval(long length, TimeUnit timeUnit) {
+            this.length = length;
+            this.timeUnit = timeUnit;
+            this.lengthNanos = timeUnit.toNanos(length);
+        }
+
+        public static TimeInterval seconds(long length) {
+            return new TimeInterval(length, TimeUnit.SECONDS);
+        }
+
+        public static TimeInterval milliSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MILLISECONDS);
+        }
+
+        public static TimeInterval microSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MILLISECONDS);
+        }
+
+        public long lengthNanos() {
+            return lengthNanos;
+        }
+
+        public long length() {
+            return length;
+        }
+
+        public TimeUnit timeUnit() {
+            return timeUnit;
+        }
+
+        @Override
+        public String toString() {
+            return "TimeInterval{" +
+                    "length=" + length +
+                    ", timeUnit=" + timeUnit +
+                    '}';
+        }
+    }
+
+    /**
+     * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
+     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+     * nextRetry = Min(nextRetry, currentTime + maxDelay)
+     *
+     * @param initialDelay      initial delay of the first retry
+     * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
+     * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
+     * @param maxDelay          maximum amount of time waiting before retrying
+     *
+     */
+    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
+        this.initialDelay = initialDelay;
+        this.delayPeriod = delayPeriod;
+        this.maxRetries = maxRetries;
+        this.maxDelay = maxDelay;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    @Override
+    public Set<TopicPartition> retriableTopicPartitions() {
+        final Set<TopicPartition> tps = new TreeSet<>();
+        final long currentTimeNanos = System.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+                tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+            } else {
+                break;  // Stop searching as soon as passed current time
+            }
+        }
+        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
+        return tps;
+    }
+
+    @Override
+    public boolean isReady(KafkaSpoutMessageId msgId) {
+        boolean retry = false;
+        if (toRetryMsgs.contains(msgId)) {
+            final long currentTimeNanos = System.nanoTime();
+            for (RetrySchedule retrySchedule : retrySchedules) {
+                if (retrySchedule.retry(currentTimeNanos)) {
+                    if (retrySchedule.msgId.equals(msgId)) {
+                        retry = true;
+                        LOG.debug("Found entry to retry {}", retrySchedule);
+                    }
+                } else {
+                    LOG.debug("Entry to retry not found {}", retrySchedule);
+                    break;  // Stop searching as soon as passed current time
+                }
+            }
+        }
+        return retry;
+    }
+
+    @Override
+    public boolean isScheduled(KafkaSpoutMessageId msgId) {
+        return toRetryMsgs.contains(msgId);
+    }
+
+    @Override
+    public boolean remove(KafkaSpoutMessageId msgId) {
+        boolean removed = false;
+        if (toRetryMsgs.contains(msgId)) {
+            for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+                final RetrySchedule retrySchedule = iterator.next();
+                if (retrySchedule.msgId().equals(msgId)) {
+                    iterator.remove();
+                    toRetryMsgs.remove(msgId);
+                    removed = true;
+                    break;
+                }
+            }
+        }
+        LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
+        LOG.trace("Current state {}", retrySchedules);
+        return removed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
+        boolean result = false;
+        for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
+            final RetrySchedule retrySchedule = rsIterator.next();
+            final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+            final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+            if (!topicPartitions.contains(tpRetry)) {
+                rsIterator.remove();
+                toRetryMsgs.remove(msgId);
+                LOG.debug("Removed {}", retrySchedule);
+                LOG.trace("Current state {}", retrySchedules);
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void schedule(KafkaSpoutMessageId msgId) {
+        if (msgId.numFails() > maxRetries) {
+            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+        } else {
+            if (toRetryMsgs.contains(msgId)) {
+                for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+                    final RetrySchedule retrySchedule = iterator.next();
+                    if (retrySchedule.msgId().equals(msgId)) {
+                        iterator.remove();
+                        toRetryMsgs.remove(msgId);
+                    }
+                }
+            }
+            final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
+            retrySchedules.add(retrySchedule);
+            toRetryMsgs.add(msgId);
+            LOG.debug("Scheduled. {}", retrySchedule);
+            LOG.trace("Current state {}", retrySchedules);
+        }
+    }
+
+    // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
+    private long nextTime(KafkaSpoutMessageId msgId) {
+        final long currentTimeNanos = System.nanoTime();
+        final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
+                ? currentTimeNanos + initialDelay.lengthNanos()
+                : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+        return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutRetryExponentialBackoff{" +
+                "delay=" + initialDelay +
+                ", ratio=" + delayPeriod +
+                ", maxRetries=" + maxRetries +
+                ", maxRetryDelay=" + maxDelay +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
new file mode 100644
index 0000000..5aab167
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Represents the logic that manages the retrial of failed tuples.
+ */
+public interface KafkaSpoutRetryService extends Serializable {
+    /**
+     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+     * @param msgId message to schedule for retrial
+     */
+    void schedule(KafkaSpoutMessageId msgId);
+
+    /**
+     * Removes a message from the list of messages scheduled for retrial
+     * @param msgId message to remove from retrial
+     */
+    boolean remove(KafkaSpoutMessageId msgId);
+
+    /**
+     * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection<TopicPartition>}.
+     * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed.
+     * This method is useful to cleanup state following partition rebalance.
+     * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages
+     * @return true if at least one message was removed, false otherwise
+     */
+    boolean retainAll(Collection<TopicPartition> topicPartitions);
+
+    /**
+     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
+     * for which a tuple has failed and has retry time less than current time
+     */
+    Set<TopicPartition> retriableTopicPartitions();
+
+    /**
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+     * i.e is scheduled and has retry time that is less than current time.
+     * @return true if message is ready to be retried, false otherwise
+     */
+    boolean isReady(KafkaSpoutMessageId msgId);
+
+    /**
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
+     * The message may or may not be ready to be retried yet.
+     * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
+     * Returns false is this message is not scheduled for retrial
+     */
+    boolean isScheduled(KafkaSpoutMessageId msgId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
index 43464a9..064a8bb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -31,27 +31,31 @@ public class KafkaSpoutStream implements Serializable {
     private final String streamId;
     private final String topic;
 
-    /** Declare specified outputFields with default stream for the specified topic */
+    /** Represents the specified outputFields and topic with the default stream */
     KafkaSpoutStream(Fields outputFields, String topic) {
         this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
     }
 
-    /** Declare specified outputFields with specified stream for the specified topic */
+    /** Represents the specified outputFields and topic with the specified stream */
     KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+        if (outputFields == null || streamId == null || topic == null) {
+            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
+                    "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
+        }
         this.outputFields = outputFields;
         this.streamId = streamId;
         this.topic = topic;
     }
 
-    public Fields getOutputFields() {
+    Fields getOutputFields() {
         return outputFields;
     }
 
-    public String getStreamId() {
+    String getStreamId() {
         return streamId;
     }
 
-    public String getTopic() {
+    String getTopic() {
         return topic;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
index 30215d1..dc5892e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -33,7 +33,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Represents the output streams associated with each topic, and provides a public API to
+ * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to
  * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
  */
 public class KafkaSpoutStreams implements Serializable {
@@ -48,7 +48,7 @@ public class KafkaSpoutStreams implements Serializable {
 
     /**
      * @param topic the topic for which to get output fields
-     * @return the output fields declared
+     * @return the declared output fields
      */
     public Fields getOutputFields(String topic) {
         if (topicToStream.containsKey(topic)) {
@@ -79,7 +79,7 @@ public class KafkaSpoutStreams implements Serializable {
         return new ArrayList<>(topicToStream.keySet());
     }
 
-    void declareOutputFields(OutputFieldsDeclarer declarer) {
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         for (KafkaSpoutStream stream : topicToStream.values()) {
             if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
                 declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
@@ -88,8 +88,8 @@ public class KafkaSpoutStreams implements Serializable {
         }
     }
 
-    void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
-        collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
+        collector.emit(getStreamId(messageId.topic()), tuple, messageId);
     }
 
     @Override
@@ -103,11 +103,11 @@ public class KafkaSpoutStreams implements Serializable {
         private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
 
         /**
-         * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
-         * All the topics will have the same stream id and output fields.
+         * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified.
+         * All topics will have the same stream id and output fields.
          */
         public Builder(Fields outputFields, String... topics) {
-            this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+            addStream(outputFields, topics);
         }
 
         /**
@@ -115,16 +115,14 @@ public class KafkaSpoutStreams implements Serializable {
          * All the topics will have the same stream id and output fields.
          */
         public Builder (Fields outputFields, String streamId, String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
-            }
+            addStream(outputFields, streamId, topics);
         }
 
         /**
          * Adds this stream to the state representing the streams associated with each topic
          */
         public Builder(KafkaSpoutStream stream) {
-            topicToStream.put(stream.getTopic(), stream);
+            addStream(stream);
         }
 
         /**
@@ -139,9 +137,7 @@ public class KafkaSpoutStreams implements Serializable {
          * Please refer to javadoc in {@link #Builder(Fields, String...)}
          */
         public Builder addStream(Fields outputFields, String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
-            }
+            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
index 45aab48..3bb71a8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -21,8 +21,38 @@ package org.apache.storm.kafka.spout;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
-    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+/**
+ * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s.
+ * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder}
+ */
+public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
+    private List<String> topics;
+
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public KafkaSpoutTupleBuilder(String... topics) {
+        if (topics == null || topics.length == 0) {
+            throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty");
+        }
+        this.topics = Arrays.asList(topics);
+    }
+
+    /**
+     * @return list of topics that use this implementation to build tuples
+     */
+    public List<String> getTopics() {
+        return Collections.unmodifiableList(topics);
+    }
+
+    /**
+     * Builds a list of tuples using the ConsumerRecord specified as parameter
+     * @param consumerRecord whose contents are used to build tuples
+     * @return list of tuples
+     */
+    public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
new file mode 100644
index 0000000..d67c69d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s.
+ * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances
+ */
+public class KafkaSpoutTuplesBuilder<K,V> implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class);
+
+    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+    private KafkaSpoutTuplesBuilder(Builder<K,V> builder) {
+        this.topicToTupleBuilders = builder.topicToTupleBuilders;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    public static class Builder<K,V> {
+        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
+        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+        @SafeVarargs
+        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
+            if (tupleBuilders == null || tupleBuilders.length == 0) {
+                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
+            }
+
+            this.tupleBuilders = Arrays.asList(tupleBuilders);
+            topicToTupleBuilders = new HashMap<>();
+        }
+
+        public KafkaSpoutTuplesBuilder<K,V> build() {
+            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
+                for (String topic : tupleBuilder.getTopics()) {
+                    if (!topicToTupleBuilders.containsKey(topic)) {
+                        topicToTupleBuilders.put(topic, tupleBuilder);
+                    }
+                }
+            }
+            return new KafkaSpoutTuplesBuilder<>(this);
+        }
+    }
+
+    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
+        final String topic = consumerRecord.topic();
+        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutTuplesBuilder{" +
+                "topicToTupleBuilders=" + topicToTupleBuilders +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
new file mode 100644
index 0000000..7a94a50
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaSpoutTestBolt extends BaseRichBolt {
+    protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.debug("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
index 4fcc3ef..0691dd3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -22,11 +22,13 @@ import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
 import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 
@@ -35,9 +37,9 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
 
 public class KafkaSpoutTopologyMain {
     private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
@@ -80,21 +82,29 @@ public class KafkaSpoutTopologyMain {
 
     public static StormTopology getTopolgyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
-        tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-        tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+        tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
         return tp.createTopology();
     }
 
     public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
-                .setPollStrategy(STREAM)
                 .setMaxUncommittedOffsets(250)
                 .build();
     }
 
+    private static KafkaSpoutRetryService getRetryService() {
+            return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS),
+                    TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+    }
+
+    private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) {
+        return new TimeInterval(delay, timeUnit);
+    }
+
     public static Map<String,Object> getKafkaConsumerProps() {
         Map<String, Object> props = new HashMap<>();
 //        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
@@ -105,16 +115,19 @@ public class KafkaSpoutTopologyMain {
         return props;
     }
 
-    public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
-        return new KafkaRecordTupleBuilder<>();
+    public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilder.Builder<>(
+                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+                .build();
     }
 
     public static KafkaSpoutStreams getKafkaSpoutStreams() {
         final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
         final Fields outputFields1 = new Fields("topic", "partition", "offset");
         return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to test_stream
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream2
+                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
+                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
                 .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
deleted file mode 100644
index c9ff9d5..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KafkaTestBolt extends BaseRichBolt {
-    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
-
-
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.debug("input = [" + input + "]");
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
new file mode 100644
index 0000000..ca65177
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicTest2TupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.partition(),
+                consumerRecord.offset());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
new file mode 100644
index 0000000..4c55aa1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicsTest0Test1TupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.partition(),
+                consumerRecord.offset(),
+                consumerRecord.key(),
+                consumerRecord.value());
+    }
+}
\ No newline at end of file