You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/03/31 21:00:09 UTC

[2/3] storm git commit: STORM-822: Kafka Spout New Consumer API - Refactored code to avoid keeping records data inside spout state - Refactored code to specify output fields per stream and build tuples per topic - Implement exponential backoff retry s

STORM-822: Kafka Spout New Consumer API
 - Refactored code to avoid keeping records data inside spout state
 - Refactored code to specify output fields per stream and build tuples per topic
 - Implement exponential backoff retry strategy
 - Send one tuple per call to nextTuple


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/332afc40
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/332afc40
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/332afc40

Branch: refs/heads/master
Commit: 332afc40d8ee7c1f4a4a747280ff83a92c279c5d
Parents: d26b984
Author: Hugo Louro <hm...@gmail.com>
Authored: Mon Mar 21 14:42:50 2016 -0700
Committer: Robert (Bobby) Evans <ev...@yahoo-inc.com>
Committed: Thu Mar 31 13:59:32 2016 -0500

----------------------------------------------------------------------
 .../kafka/spout/KafkaRecordTupleBuilder.java    |  44 ---
 .../apache/storm/kafka/spout/KafkaSpout.java    | 160 +++++++----
 .../storm/kafka/spout/KafkaSpoutConfig.java     |  87 +++---
 .../storm/kafka/spout/KafkaSpoutMessageId.java  |  16 +-
 .../KafkaSpoutRetryExponentialBackoff.java      | 281 +++++++++++++++++++
 .../kafka/spout/KafkaSpoutRetryService.java     |  72 +++++
 .../storm/kafka/spout/KafkaSpoutStream.java     |  14 +-
 .../storm/kafka/spout/KafkaSpoutStreams.java    |  26 +-
 .../kafka/spout/KafkaSpoutTupleBuilder.java     |  34 ++-
 .../kafka/spout/KafkaSpoutTuplesBuilder.java    |  82 ++++++
 .../kafka/spout/test/KafkaSpoutTestBolt.java    |  50 ++++
 .../spout/test/KafkaSpoutTopologyMain.java      |  37 ++-
 .../storm/kafka/spout/test/KafkaTestBolt.java   |  52 ----
 .../spout/test/TopicTest2TupleBuilder.java      |  40 +++
 .../test/TopicsTest0Test1TupleBuilder.java      |  42 +++
 15 files changed, 798 insertions(+), 239 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
deleted file mode 100644
index 4d67632..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaRecordTupleBuilder.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class KafkaRecordTupleBuilder<K, V> implements KafkaSpoutTupleBuilder<K, V> {
-    @Override
-    public List<Object> buildTuple(final ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams) {
-        final Fields outputFields = kafkaSpoutStreams.getOutputFields(consumerRecord.topic());
-        if (outputFields != null) {
-            if (outputFields.size() == 3) {
-                return new Values(consumerRecord.topic(),
-                        consumerRecord.partition(),
-                        consumerRecord.offset());
-            } else if (outputFields.size() == 5) {
-                return new Values(consumerRecord.topic(),
-                        consumerRecord.partition(),
-                        consumerRecord.offset(),
-                        consumerRecord.key(),
-                        consumerRecord.value());
-            }
-        }
-        throw new RuntimeException("Failed to build tuple. " + consumerRecord + " " + kafkaSpoutStreams);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 9a49ee8..d211ae9 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -25,7 +25,6 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.topology.OutputFieldsDeclarer;
@@ -34,12 +33,16 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.util.Collection;
+import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NavigableSet;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
 
@@ -62,23 +65,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
 
     // Bookkeeping
-    private KafkaSpoutStreams kafkaSpoutStreams;
-    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
-    private transient Timer commitTimer;                                    // timer == null for auto commit mode
-    private transient Timer logTimer;
-    private transient Map<TopicPartition, OffsetEntry> acked;         // emitted tuples that were successfully acked. These tuples will be committed periodically when the timer expires, on consumer rebalance, or on close/deactivate
-    private transient int maxRetries;                                 // Max number of times a tuple is retried
-    private transient boolean initialized;          // Flag indicating that the spout is still undergoing initialization process.
+    private transient int maxRetries;                                   // Max number of times a tuple is retried
+    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;  // Strategy to determine the fetch offset of the first realized by the spout upon activation
+    private transient KafkaSpoutRetryService retryService;              // Class that has the logic to handle tuple failure
+    private transient Timer commitTimer;                                // timer == null for auto commit mode
+    private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
-    private transient long numUncommittedOffsets;   // Number of offsets that have been polled and emitted but not yet been committed
-    private transient FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private transient PollStrategy pollStrategy;
 
+    private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps all the logic to declare output fields and emit tuples
+    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // Object that contains the logic to build tuples for each ConsumerRecord
 
-    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
+    private transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
+    private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
+    private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
+    private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
+
+
+    public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
         this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-        this.tupleBuilder = tupleBuilder;
     }
 
     @Override
@@ -89,18 +94,25 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         this.collector = collector;
         maxRetries = kafkaSpoutConfig.getMaxTupleRetries();
         numUncommittedOffsets = 0;
-        logTimer = new Timer(500, Math.min(1000, kafkaSpoutConfig.getOffsetsCommitPeriodMs()/2), TimeUnit.MILLISECONDS);
 
         // Offset management
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
-        pollStrategy = kafkaSpoutConfig.getPollStrategy();
         consumerAutoCommitMode = kafkaSpoutConfig.isConsumerAutoCommitMode();
 
+        // Retries management
+        retryService = kafkaSpoutConfig.getRetryService();
+
+        // Tuples builder delegate
+        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
             commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
-            acked = new HashMap<>();
         }
 
+        acked = new HashMap<>();
+        emitted = new HashSet<>();
+        waitingToEmit = Collections.emptyListIterator();
+
         LOG.info("Kafka Spout opened with the following configuration: {}", kafkaSpoutConfig);
     }
 
@@ -130,6 +142,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 acked.keySet().retainAll(partitions);   // remove from acked all partitions that are no longer assigned to this spout
             }
 
+            retryService.retainAll(partitions);
+
             for (TopicPartition tp : partitions) {
                 final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
                 final long fetchOffset = doSeek(tp, committedOffset);
@@ -182,67 +196,88 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         if (initialized) {
             if (commit()) {
                 commitOffsetsForAckedTuples();
-            } else if (poll()) {
-                emitTuples(pollKafkaBroker());
-            } else if (logTimer.isExpiredResetOnTrue()) {   // to limit the number of messages that get printed.
-                log();
+            }
+
+            if (poll()) {
+                setWaitingToEmit(pollKafkaBroker());
+            }
+
+            if (waitingToEmit()) {
+                emit();
             }
         } else {
             LOG.debug("Spout not initialized. Not sending tuples until initialization completes");
         }
     }
 
-    private void log() {
-        switch(pollStrategy) {
-            case STREAM:
-                LOG.trace("Reached the maximum number number of uncommitted records [{}]. " +
-                        "No more polls will occur until a sequence of commits sets the count under the [{}] threshold ",
-                        numUncommittedOffsets, kafkaSpoutConfig.getMaxUncommittedOffsets());
-                break;
-            case BATCH:
-                LOG.trace("No more polls will occur until the last batch completes. [{}] emitted tuples pending", numUncommittedOffsets);
-                break;
-            default:
-                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
-        }
+    private boolean commit() {
+        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    }
 
+    private boolean poll() {
+        return !waitingToEmit() && numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
     }
 
-    // always poll in auto commit mode because no state is kept and therefore there is no need to set an upper limit in memory
-    private boolean poll()  {
-        switch(pollStrategy) {
-            case STREAM:
-                return consumerAutoCommitMode || numUncommittedOffsets < kafkaSpoutConfig.getMaxUncommittedOffsets();
-            case BATCH:
-                return consumerAutoCommitMode || numUncommittedOffsets <= 0;
-            default:
-                throw new IllegalStateException("No implementation defined for polling strategy " + pollStrategy);
-        }
+    private boolean waitingToEmit() {
+        return waitingToEmit != null && waitingToEmit.hasNext();
     }
 
-    private boolean commit() {
-        return !consumerAutoCommitMode && commitTimer.isExpiredResetOnTrue();    // timer != null for non auto commit mode
+    public void setWaitingToEmit(ConsumerRecords<K,V> consumerRecords) {
+        List<ConsumerRecord<K,V>> waitingToEmitList = new LinkedList<>();
+        for (TopicPartition tp : consumerRecords.partitions()) {
+            waitingToEmitList.addAll(consumerRecords.records(tp));
+        }
+        waitingToEmit = waitingToEmitList.iterator();
+        LOG.trace("Records waiting to be emitted {}", waitingToEmitList);
     }
 
+    // ======== poll =========
     private ConsumerRecords<K, V> pollKafkaBroker() {
+        doSeekRetriableTopicPartitions();
+
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
-        numUncommittedOffsets+= numPolledRecords;
         LOG.debug("Polled [{}] records from Kafka. NumUncommittedOffsets=[{}]", numPolledRecords, numUncommittedOffsets);
         return consumerRecords;
     }
 
-    private void emitTuples(ConsumerRecords<K, V> consumerRecords) {
-        for (TopicPartition tp : consumerRecords.partitions()) {
-            final Iterable<ConsumerRecord<K, V>> records = consumerRecords.records(tp.topic());
+    private void doSeekRetriableTopicPartitions() {
+        final Set<TopicPartition> retriableTopicPartitions = retryService.retriableTopicPartitions();
+
+        for (TopicPartition rtp : retriableTopicPartitions) {
+            final OffsetAndMetadata offsetAndMeta = acked.get(rtp).findNextCommitOffset();
+            if (offsetAndMeta != null) {
+                kafkaConsumer.seek(rtp, offsetAndMeta.offset() + 1);  // seek to the next offset that is ready to commit in next commit cycle
+            } else {
+                kafkaConsumer.seekToEnd(rtp);    // Seek to last committed offset
+            }
+        }
+    }
 
-            for (final ConsumerRecord<K, V> record : records) {
-                final List<Object> tuple = tupleBuilder.buildTuple(record, kafkaSpoutStreams);
-                final KafkaSpoutMessageId messageId = new KafkaSpoutMessageId(record, tuple);
+    // ======== emit  =========
+    private void emit() {
+        emitTupleIfNotEmitted(waitingToEmit.next());
+        waitingToEmit.remove();
+    }
 
-                kafkaSpoutStreams.emit(collector, messageId);           // emits one tuple per record
-                LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
+    // emits one tuple per record
+    private void emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
+        final TopicPartition tp = new TopicPartition(record.topic(), record.partition());
+        final KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+
+        if (acked.containsKey(tp) && acked.get(tp).contains(msgId)) {   // has been acked
+            LOG.trace("Tuple for record [{}] has already been acked. Skipping", record);
+        } else if (emitted.contains(msgId)) {   // has been emitted and it's pending ack or fail
+            LOG.trace("Tuple for record [{}] has already been emitted. Skipping", record);
+        } else if (!retryService.isScheduled(msgId) || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
+            final List<Object> tuple = tuplesBuilder.buildTuple(record);
+            kafkaSpoutStreams.emit(collector, tuple, msgId);
+            emitted.add(msgId);
+            numUncommittedOffsets++;
+            if (retryService.isReady(msgId)) { // has failed. Is it ready for retry ?
+                retryService.remove(msgId);  // re-emitted hence remove from failed
             }
+            LOG.trace("Emitted tuple [{}] for record [{}]", tuple, record);
         }
     }
 
@@ -275,11 +310,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void ack(Object messageId) {
+        final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
         if (!consumerAutoCommitMode) {  // Only need to keep track of acked tuples if commits are not done automatically
-            final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
             acked.get(msgId.getTopicPartition()).add(msgId);
             LOG.trace("Acked message [{}]. Messages acked and pending commit [{}]", msgId, acked);
         }
+        emitted.remove(msgId);
     }
 
     // ======== Fail =======
@@ -287,10 +323,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     @Override
     public void fail(Object messageId) {
         final KafkaSpoutMessageId msgId = (KafkaSpoutMessageId) messageId;
+        emitted.remove(msgId);
         if (msgId.numFails() < maxRetries) {
             msgId.incrementNumFails();
-            kafkaSpoutStreams.emit(collector, msgId);
-            LOG.trace("Retried tuple with message id [{}]", msgId);
+            retryService.schedule(msgId);
         } else { // limit to max number of retries
             LOG.debug("Reached maximum number of retries. Message [{}] being marked as acked.", msgId);
             ack(msgId);
@@ -367,7 +403,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             this.tp = tp;
             this.initialFetchOffset = initialFetchOffset;
             this.committedOffset = initialFetchOffset - 1;
-            LOG.debug("Created OffsetEntry. {}", this);
+            LOG.debug("Instantiated {}", this);
         }
 
         public void add(KafkaSpoutMessageId msgId) {          // O(Log N)
@@ -434,6 +470,14 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             return ackedMsgs.isEmpty();
         }
 
+        public boolean contains(ConsumerRecord record) {
+            return contains(new KafkaSpoutMessageId(record));
+        }
+
+        public boolean contains(KafkaSpoutMessageId msgId) {
+            return ackedMsgs.contains(msgId);
+        }
+
         @Override
         public String toString() {
             return "OffsetEntry{" +

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index d969f1f..29cedb2 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,7 +18,9 @@
 
 package org.apache.storm.kafka.spout;
 
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
 
 import java.io.Serializable;
 import java.util.ArrayList;
@@ -63,24 +65,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         UNCOMMITTED_EARLIEST,
         UNCOMMITTED_LATEST }
 
-    /**
-     * Defines when to poll the next batch of records from Kafka. The choice of this parameter will affect throughput and the memory
-     * footprint of the Kafka spout. The allowed values are STREAM and BATCH. STREAM will likely have higher throughput and use more memory
-     * (it stores in memory the entire KafkaRecord,including data). BATCH will likely have less throughput but also use less memory.
-     * The BATCH behavior is similar to the behavior of the previous Kafka Spout. De default value is STREAM.
-     * <ul>
-     *     <li>STREAM Every periodic call to nextTuple polls a new batch of records from Kafka as long as the maxUncommittedOffsets
-     *     threshold has not yet been reached. When the threshold his reached, no more records are polled until enough offsets have been
-     *     committed, such that the number of pending offsets is less than maxUncommittedOffsets. See {@link Builder#setMaxUncommittedOffsets(int)}
-     *     </li>
-     *     <li>BATCH Only polls a new batch of records from kafka once all the records that came in the previous poll have been acked.</li>
-     * </ul>
-     */
-    public enum PollStrategy {
-        STREAM,
-        BATCH
-    }
-
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
     private final Deserializer<K> keyDeserializer;
@@ -92,8 +76,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     private final int maxRetries;
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final PollStrategy pollStrategy;
     private final KafkaSpoutStreams kafkaSpoutStreams;
+    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+    private final KafkaSpoutRetryService retryService;
 
     private KafkaSpoutConfig(Builder<K,V> builder) {
         this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
@@ -103,9 +88,10 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
         this.maxRetries = builder.maxRetries;
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.pollStrategy = builder.pollStrategy;
         this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+        this.tuplesBuilder = builder.tuplesBuilder;
+        this.retryService = builder.retryService;
     }
 
     private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
@@ -117,33 +103,61 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     public static class Builder<K,V> {
-        private Map<String, Object> kafkaProps;
+        private final Map<String, Object> kafkaProps;
         private Deserializer<K> keyDeserializer;
         private Deserializer<V> valueDeserializer;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
         private int maxRetries = DEFAULT_MAX_RETRIES;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-        private KafkaSpoutStreams kafkaSpoutStreams;
+        private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-        private PollStrategy pollStrategy = PollStrategy.STREAM;
+        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
+        private final KafkaSpoutRetryService retryService;
+
+        /**
+         * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
+         * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
+         * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+         */
+        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
+            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
+                    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+                            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+        }
 
         /***
          * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
          * The optional configuration can be specified using the set methods of this builder
          * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
          * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
+         * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
+         * @param retryService  logic that manages the retrial of failed tuples
          */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams) {
+        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
+                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
             if (kafkaProps == null || kafkaProps.isEmpty()) {
-                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required. " + kafkaProps);
+                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
             }
 
             if (kafkaSpoutStreams == null)  {
-                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit in the same stream.");
+                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
+            }
+
+            if (tuplesBuilder == null) {
+                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
             }
+
+            if (retryService == null) {
+                throw new IllegalArgumentException("Must specify at implementation of retry service");
+            }
+
             this.kafkaProps = kafkaProps;
             this.kafkaSpoutStreams = kafkaSpoutStreams;
+            this.tuplesBuilder = tuplesBuilder;
+            this.retryService = retryService;
         }
 
         /**
@@ -214,16 +228,6 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             return this;
         }
 
-        /**
-         * Sets the strategy used by the the Kafka spout to decide when to poll the next batch of records from Kafka.
-         * Please refer to to the documentation in {@link PollStrategy}
-         * @param pollStrategy strategy used to decide when to poll
-         * */
-        public Builder<K, V> setPollStrategy(PollStrategy pollStrategy) {
-            this.pollStrategy = pollStrategy;
-            return this;
-        }
-
         public KafkaSpoutConfig<K,V> build() {
             return new KafkaSpoutConfig<>(this);
         }
@@ -258,6 +262,9 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return (String) kafkaProps.get(Consumer.GROUP_ID);
     }
 
+    /**
+     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
+     */
     public List<String> getSubscribedTopics() {
         return new ArrayList<>(kafkaSpoutStreams.getTopics());
     }
@@ -278,8 +285,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return maxUncommittedOffsets;
     }
 
-    public PollStrategy getPollStrategy() {
-        return pollStrategy;
+    public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
+        return tuplesBuilder;
+    }
+
+    public KafkaSpoutRetryService getRetryService() {
+        return retryService;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 0a6b126..71f8327 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -21,23 +21,18 @@ package org.apache.storm.kafka.spout;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
-import java.util.Collections;
-import java.util.List;
-
 public class KafkaSpoutMessageId {
     private transient TopicPartition topicPart;
     private transient long offset;
-    private transient List<Object> tuple;
     private transient int numFails = 0;
 
-    public KafkaSpoutMessageId(ConsumerRecord consumerRecord, List<Object> tuple) {
-        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), tuple);
+    public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
+        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
     }
 
-    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, List<Object> tuple) {
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
         this.topicPart = topicPart;
         this.offset = offset;
-        this.tuple = tuple;
     }
 
     public int partition() {
@@ -64,10 +59,6 @@ public class KafkaSpoutMessageId {
         return topicPart;
     }
 
-    public List<Object> getTuple() {
-        return Collections.unmodifiableList(tuple);
-    }
-
     public String getMetadata(Thread currThread) {
         return "{" +
                 "topic-partition=" + topicPart +
@@ -83,7 +74,6 @@ public class KafkaSpoutMessageId {
                 "topic-partition=" + topicPart +
                 ", offset=" + offset +
                 ", numFails=" + numFails +
-                ", tuple=" + tuple +
                 '}';
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
new file mode 100644
index 0000000..208cef4
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+import java.util.TreeSet;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
+ * nextRetry = Min(nextRetry, currentTime + maxDelay)
+ */
+public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
+    private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
+
+    private TimeInterval initialDelay;
+    private TimeInterval delayPeriod;
+    private TimeInterval maxDelay;
+    private int maxRetries;
+
+    private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
+
+    /**
+     * Comparator ordering by timestamp 
+     */
+    private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+        public int compare(RetrySchedule entry1, RetrySchedule entry2) {
+            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+        }
+    }
+
+    private class RetrySchedule {
+        private KafkaSpoutMessageId msgId;
+        private long nextRetryTimeNanos;
+
+        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+            this.msgId = msgId;
+            this.nextRetryTimeNanos = nextRetryTime;
+            LOG.debug("Created {}", this);
+        }
+
+        public void setNextRetryTime() {
+            nextRetryTimeNanos = nextTime(msgId);
+            LOG.debug("Updated {}", this);
+        }
+
+        public boolean retry(long currentTimeNanos) {
+            return nextRetryTimeNanos <= currentTimeNanos;
+        }
+
+        @Override
+        public String toString() {
+            return "RetrySchedule{" +
+                    "msgId=" + msgId +
+                    ", nextRetryTime=" + nextRetryTimeNanos +
+                    '}';
+        }
+
+        public KafkaSpoutMessageId msgId() {
+            return msgId;
+        }
+
+        public long nextRetryTimeNanos() {
+            return nextRetryTimeNanos;
+        }
+    }
+
+    public static class TimeInterval implements Serializable {
+        private long lengthNanos;
+        private long length;
+        private TimeUnit timeUnit;
+
+        /**
+         * @param length length of the time interval in the units specified by {@link TimeUnit}
+         * @param timeUnit unit used to specify a time interval on which to specify a time unit
+         */
+        public TimeInterval(long length, TimeUnit timeUnit) {
+            this.length = length;
+            this.timeUnit = timeUnit;
+            this.lengthNanos = timeUnit.toNanos(length);
+        }
+
+        public static TimeInterval seconds(long length) {
+            return new TimeInterval(length, TimeUnit.SECONDS);
+        }
+
+        public static TimeInterval milliSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MILLISECONDS);
+        }
+
+        public static TimeInterval microSeconds(long length) {
+            return new TimeInterval(length, TimeUnit.MILLISECONDS);
+        }
+
+        public long lengthNanos() {
+            return lengthNanos;
+        }
+
+        public long length() {
+            return length;
+        }
+
+        public TimeUnit timeUnit() {
+            return timeUnit;
+        }
+
+        @Override
+        public String toString() {
+            return "TimeInterval{" +
+                    "length=" + length +
+                    ", timeUnit=" + timeUnit +
+                    '}';
+        }
+    }
+
+    /**
+     * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
+     * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
+     * nextRetry = Min(nextRetry, currentTime + maxDelay)
+     *
+     * @param initialDelay      initial delay of the first retry
+     * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
+     * @param maxRetries        maximum number of times a tuple is retried before being acked and scheduled for commit
+     * @param maxDelay          maximum amount of time waiting before retrying
+     *
+     */
+    public KafkaSpoutRetryExponentialBackoff(TimeInterval initialDelay, TimeInterval delayPeriod, int maxRetries, TimeInterval maxDelay) {
+        this.initialDelay = initialDelay;
+        this.delayPeriod = delayPeriod;
+        this.maxRetries = maxRetries;
+        this.maxDelay = maxDelay;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    @Override
+    public Set<TopicPartition> retriableTopicPartitions() {
+        final Set<TopicPartition> tps = new TreeSet<>();
+        final long currentTimeNanos = System.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+                tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+            } else {
+                break;  // Stop searching as soon as passed current time
+            }
+        }
+        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
+        return tps;
+    }
+
+    @Override
+    public boolean isReady(KafkaSpoutMessageId msgId) {
+        boolean retry = false;
+        if (toRetryMsgs.contains(msgId)) {
+            final long currentTimeNanos = System.nanoTime();
+            for (RetrySchedule retrySchedule : retrySchedules) {
+                if (retrySchedule.retry(currentTimeNanos)) {
+                    if (retrySchedule.msgId.equals(msgId)) {
+                        retry = true;
+                        LOG.debug("Found entry to retry {}", retrySchedule);
+                    }
+                } else {
+                    LOG.debug("Entry to retry not found {}", retrySchedule);
+                    break;  // Stop searching as soon as passed current time
+                }
+            }
+        }
+        return retry;
+    }
+
+    @Override
+    public boolean isScheduled(KafkaSpoutMessageId msgId) {
+        return toRetryMsgs.contains(msgId);
+    }
+
+    @Override
+    public boolean remove(KafkaSpoutMessageId msgId) {
+        boolean removed = false;
+        if (toRetryMsgs.contains(msgId)) {
+            for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+                final RetrySchedule retrySchedule = iterator.next();
+                if (retrySchedule.msgId().equals(msgId)) {
+                    iterator.remove();
+                    toRetryMsgs.remove(msgId);
+                    removed = true;
+                    break;
+                }
+            }
+        }
+        LOG.debug(removed ? "Removed {} " : "Not removed {}", msgId);
+        LOG.trace("Current state {}", retrySchedules);
+        return removed;
+    }
+
+    @Override
+    public boolean retainAll(Collection<TopicPartition> topicPartitions) {
+        boolean result = false;
+        for (Iterator<RetrySchedule> rsIterator = retrySchedules.iterator(); rsIterator.hasNext(); ) {
+            final RetrySchedule retrySchedule = rsIterator.next();
+            final KafkaSpoutMessageId msgId = retrySchedule.msgId;
+            final TopicPartition tpRetry= new TopicPartition(msgId.topic(), msgId.partition());
+            if (!topicPartitions.contains(tpRetry)) {
+                rsIterator.remove();
+                toRetryMsgs.remove(msgId);
+                LOG.debug("Removed {}", retrySchedule);
+                LOG.trace("Current state {}", retrySchedules);
+                result = true;
+            }
+        }
+        return result;
+    }
+
+    @Override
+    public void schedule(KafkaSpoutMessageId msgId) {
+        if (msgId.numFails() > maxRetries) {
+            LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+        } else {
+            if (toRetryMsgs.contains(msgId)) {
+                for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
+                    final RetrySchedule retrySchedule = iterator.next();
+                    if (retrySchedule.msgId().equals(msgId)) {
+                        iterator.remove();
+                        toRetryMsgs.remove(msgId);
+                    }
+                }
+            }
+            final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
+            retrySchedules.add(retrySchedule);
+            toRetryMsgs.add(msgId);
+            LOG.debug("Scheduled. {}", retrySchedule);
+            LOG.trace("Current state {}", retrySchedules);
+        }
+    }
+
+    // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
+    private long nextTime(KafkaSpoutMessageId msgId) {
+        final long currentTimeNanos = System.nanoTime();
+        final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
+                ? currentTimeNanos + initialDelay.lengthNanos()
+                : (long) (currentTimeNanos + Math.pow(delayPeriod.lengthNanos, msgId.numFails() - 1));
+        return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutRetryExponentialBackoff{" +
+                "delay=" + initialDelay +
+                ", ratio=" + delayPeriod +
+                ", maxRetries=" + maxRetries +
+                ", maxRetryDelay=" + maxDelay +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
new file mode 100644
index 0000000..5aab167
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+/**
+ * Represents the logic that manages the retrial of failed tuples.
+ */
+public interface KafkaSpoutRetryService extends Serializable {
+    /**
+     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+     * @param msgId message to schedule for retrial
+     */
+    void schedule(KafkaSpoutMessageId msgId);
+
+    /**
+     * Removes a message from the list of messages scheduled for retrial
+     * @param msgId message to remove from retrial
+     */
+    boolean remove(KafkaSpoutMessageId msgId);
+
+    /**
+     * Retains all the messages whose {@link TopicPartition} belongs to the specified {@code Collection<TopicPartition>}.
+     * All messages that come from a {@link TopicPartition} NOT existing in the collection will be removed.
+     * This method is useful to cleanup state following partition rebalance.
+     * @param topicPartitions Collection of {@link TopicPartition} for which to keep messages
+     * @return true if at least one message was removed, false otherwise
+     */
+    boolean retainAll(Collection<TopicPartition> topicPartitions);
+
+    /**
+     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
+     * for which a tuple has failed and has retry time less than current time
+     */
+    Set<TopicPartition> retriableTopicPartitions();
+
+    /**
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+     * i.e is scheduled and has retry time that is less than current time.
+     * @return true if message is ready to be retried, false otherwise
+     */
+    boolean isReady(KafkaSpoutMessageId msgId);
+
+    /**
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
+     * The message may or may not be ready to be retried yet.
+     * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
+     * Returns false is this message is not scheduled for retrial
+     */
+    boolean isScheduled(KafkaSpoutMessageId msgId);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
index 43464a9..064a8bb 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
@@ -31,27 +31,31 @@ public class KafkaSpoutStream implements Serializable {
     private final String streamId;
     private final String topic;
 
-    /** Declare specified outputFields with default stream for the specified topic */
+    /** Represents the specified outputFields and topic with the default stream */
     KafkaSpoutStream(Fields outputFields, String topic) {
         this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
     }
 
-    /** Declare specified outputFields with specified stream for the specified topic */
+    /** Represents the specified outputFields and topic with the specified stream */
     KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
+        if (outputFields == null || streamId == null || topic == null) {
+            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
+                    "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
+        }
         this.outputFields = outputFields;
         this.streamId = streamId;
         this.topic = topic;
     }
 
-    public Fields getOutputFields() {
+    Fields getOutputFields() {
         return outputFields;
     }
 
-    public String getStreamId() {
+    String getStreamId() {
         return streamId;
     }
 
-    public String getTopic() {
+    String getTopic() {
         return topic;
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
index 30215d1..dc5892e 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
@@ -33,7 +33,7 @@ import java.util.List;
 import java.util.Map;
 
 /**
- * Represents the output streams associated with each topic, and provides a public API to
+ * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to
  * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
  */
 public class KafkaSpoutStreams implements Serializable {
@@ -48,7 +48,7 @@ public class KafkaSpoutStreams implements Serializable {
 
     /**
      * @param topic the topic for which to get output fields
-     * @return the output fields declared
+     * @return the declared output fields
      */
     public Fields getOutputFields(String topic) {
         if (topicToStream.containsKey(topic)) {
@@ -79,7 +79,7 @@ public class KafkaSpoutStreams implements Serializable {
         return new ArrayList<>(topicToStream.keySet());
     }
 
-    void declareOutputFields(OutputFieldsDeclarer declarer) {
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
         for (KafkaSpoutStream stream : topicToStream.values()) {
             if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
                 declarer.declareStream(stream.getStreamId(), stream.getOutputFields());
@@ -88,8 +88,8 @@ public class KafkaSpoutStreams implements Serializable {
         }
     }
 
-    void emit(SpoutOutputCollector collector, KafkaSpoutMessageId messageId) {
-        collector.emit(getStreamId(messageId.topic()), messageId.getTuple(), messageId);
+    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
+        collector.emit(getStreamId(messageId.topic()), tuple, messageId);
     }
 
     @Override
@@ -103,11 +103,11 @@ public class KafkaSpoutStreams implements Serializable {
         private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
 
         /**
-         * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
-         * All the topics will have the same stream id and output fields.
+         * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified.
+         * All topics will have the same stream id and output fields.
          */
         public Builder(Fields outputFields, String... topics) {
-            this(outputFields, Utils.DEFAULT_STREAM_ID, topics);
+            addStream(outputFields, topics);
         }
 
         /**
@@ -115,16 +115,14 @@ public class KafkaSpoutStreams implements Serializable {
          * All the topics will have the same stream id and output fields.
          */
         public Builder (Fields outputFields, String streamId, String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
-            }
+            addStream(outputFields, streamId, topics);
         }
 
         /**
          * Adds this stream to the state representing the streams associated with each topic
          */
         public Builder(KafkaSpoutStream stream) {
-            topicToStream.put(stream.getTopic(), stream);
+            addStream(stream);
         }
 
         /**
@@ -139,9 +137,7 @@ public class KafkaSpoutStreams implements Serializable {
          * Please refer to javadoc in {@link #Builder(Fields, String...)}
          */
         public Builder addStream(Fields outputFields, String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, topic));
-            }
+            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
             return this;
         }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
index 45aab48..3bb71a8 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
@@ -21,8 +21,38 @@ package org.apache.storm.kafka.spout;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 
 import java.io.Serializable;
+import java.util.Arrays;
+import java.util.Collections;
 import java.util.List;
 
-public interface KafkaSpoutTupleBuilder<K,V> extends Serializable {
-    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord, KafkaSpoutStreams kafkaSpoutStreams);
+/**
+ * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s.
+ * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder}
+ */
+public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
+    private List<String> topics;
+
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public KafkaSpoutTupleBuilder(String... topics) {
+        if (topics == null || topics.length == 0) {
+            throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty");
+        }
+        this.topics = Arrays.asList(topics);
+    }
+
+    /**
+     * @return list of topics that use this implementation to build tuples
+     */
+    public List<String> getTopics() {
+        return Collections.unmodifiableList(topics);
+    }
+
+    /**
+     * Builds a list of tuples using the ConsumerRecord specified as parameter
+     * @param consumerRecord whose contents are used to build tuples
+     * @return list of tuples
+     */
+    public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
new file mode 100644
index 0000000..d67c69d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s.
+ * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances
+ */
+public class KafkaSpoutTuplesBuilder<K,V> implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilder.class);
+
+    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+    private KafkaSpoutTuplesBuilder(Builder<K,V> builder) {
+        this.topicToTupleBuilders = builder.topicToTupleBuilders;
+        LOG.debug("Instantiated {}", this);
+    }
+
+    public static class Builder<K,V> {
+        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
+        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
+
+        @SafeVarargs
+        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
+            if (tupleBuilders == null || tupleBuilders.length == 0) {
+                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
+            }
+
+            this.tupleBuilders = Arrays.asList(tupleBuilders);
+            topicToTupleBuilders = new HashMap<>();
+        }
+
+        public KafkaSpoutTuplesBuilder<K,V> build() {
+            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
+                for (String topic : tupleBuilder.getTopics()) {
+                    if (!topicToTupleBuilders.containsKey(topic)) {
+                        topicToTupleBuilders.put(topic, tupleBuilder);
+                    }
+                }
+            }
+            return new KafkaSpoutTuplesBuilder<>(this);
+        }
+    }
+
+    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
+        final String topic = consumerRecord.topic();
+        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
+    }
+
+    @Override
+    public String toString() {
+        return "KafkaSpoutTuplesBuilder{" +
+                "topicToTupleBuilders=" + topicToTupleBuilders +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
new file mode 100644
index 0000000..7a94a50
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTestBolt.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichBolt;
+import org.apache.storm.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+public class KafkaSpoutTestBolt extends BaseRichBolt {
+    protected static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTestBolt.class);
+    private OutputCollector collector;
+
+    @Override
+    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        this.collector = collector;
+    }
+
+    @Override
+    public void execute(Tuple input) {
+        LOG.debug("input = [" + input + "]");
+        collector.ack(input);
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
index 4fcc3ef..0691dd3 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaSpoutTopologyMain.java
@@ -22,11 +22,13 @@ import org.apache.storm.Config;
 import org.apache.storm.LocalCluster;
 import org.apache.storm.StormSubmitter;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.KafkaRecordTupleBuilder;
 import org.apache.storm.kafka.spout.KafkaSpout;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
 import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
 
@@ -35,9 +37,9 @@ import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
 import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.PollStrategy.STREAM;
 
 public class KafkaSpoutTopologyMain {
     private static final String[] STREAMS = new String[]{"test_stream","test1_stream","test2_stream"};
@@ -80,21 +82,29 @@ public class KafkaSpoutTopologyMain {
 
     public static StormTopology getTopolgyKafkaSpout() {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams()), getTupleBuilder()), 1);
-        tp.setBolt("kafka_bolt", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
-        tp.setBolt("kafka_bolt_1", new KafkaTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams())), 1);
+        tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[0]);
+        tp.setBolt("kafka_bolt_1", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAMS[2]);
         return tp.createTopology();
     }
 
     public static KafkaSpoutConfig<String,String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams) {
-        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams)
+        return new KafkaSpoutConfig.Builder<String, String>(getKafkaConsumerProps(), kafkaSpoutStreams, getTuplesBuilder(), getRetryService())
                 .setOffsetCommitPeriodMs(10_000)
                 .setFirstPollOffsetStrategy(EARLIEST)
-                .setPollStrategy(STREAM)
                 .setMaxUncommittedOffsets(250)
                 .build();
     }
 
+    private static KafkaSpoutRetryService getRetryService() {
+            return new KafkaSpoutRetryExponentialBackoff(getTimeInterval(500, TimeUnit.MICROSECONDS),
+                    TimeInterval.milliSeconds(2), Integer.MAX_VALUE, TimeInterval.seconds(10));
+    }
+
+    private static TimeInterval getTimeInterval(long delay, TimeUnit timeUnit) {
+        return new TimeInterval(delay, timeUnit);
+    }
+
     public static Map<String,Object> getKafkaConsumerProps() {
         Map<String, Object> props = new HashMap<>();
 //        props.put(KafkaSpoutConfig.Consumer.ENABLE_AUTO_COMMIT, "true");
@@ -105,16 +115,19 @@ public class KafkaSpoutTopologyMain {
         return props;
     }
 
-    public static KafkaSpoutTupleBuilder<String,String> getTupleBuilder() {
-        return new KafkaRecordTupleBuilder<>();
+    public static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
+        return new KafkaSpoutTuplesBuilder.Builder<>(
+                new TopicsTest0Test1TupleBuilder<String, String>(TOPICS[0], TOPICS[1]),
+                new TopicTest2TupleBuilder<String, String>(TOPICS[2]))
+                .build();
     }
 
     public static KafkaSpoutStreams getKafkaSpoutStreams() {
         final Fields outputFields = new Fields("topic", "partition", "offset", "key", "value");
         final Fields outputFields1 = new Fields("topic", "partition", "offset");
         return new KafkaSpoutStreams.Builder(outputFields, STREAMS[0], new String[]{TOPICS[0], TOPICS[1]})  // contents of topics test, test1, sent to test_stream
-                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream
-                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents topic test2 sent to test_stream2
+                .addStream(outputFields, STREAMS[0], new String[]{TOPICS[2]})  // contents of topic test2 sent to test_stream
+                .addStream(outputFields1, STREAMS[2], new String[]{TOPICS[2]})  // contents of topic test2 sent to test2_stream
                 .build();
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
deleted file mode 100644
index c9ff9d5..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/KafkaTestBolt.java
+++ /dev/null
@@ -1,52 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout.test;
-
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Map;
-
-public class KafkaTestBolt extends BaseRichBolt {
-    protected static final Logger LOG = LoggerFactory.getLogger(KafkaTestBolt.class);
-
-
-    private OutputCollector collector;
-
-    @Override
-    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
-        this.collector = collector;
-    }
-
-    @Override
-    public void execute(Tuple input) {
-        LOG.debug("input = [" + input + "]");
-        collector.ack(input);
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
new file mode 100644
index 0000000..ca65177
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicTest2TupleBuilder.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicTest2TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicTest2TupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.partition(),
+                consumerRecord.offset());
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/332afc40/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
new file mode 100644
index 0000000..4c55aa1
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/test/TopicsTest0Test1TupleBuilder.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.test;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
+import org.apache.storm.tuple.Values;
+
+import java.util.List;
+
+public class TopicsTest0Test1TupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
+    /**
+     * @param topics list of topics that use this implementation to build tuples
+     */
+    public TopicsTest0Test1TupleBuilder(String... topics) {
+        super(topics);
+    }
+
+    @Override
+    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
+        return new Values(consumerRecord.topic(),
+                consumerRecord.partition(),
+                consumerRecord.offset(),
+                consumerRecord.key(),
+                consumerRecord.value());
+    }
+}
\ No newline at end of file