You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:07 UTC

[06/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 7c97ac9..7aa836c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,232 +18,810 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.Config;
+import org.apache.storm.annotation.InterfaceStability;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
-    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
-    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
-    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
-    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
-
-    // Kafka property names
-    public interface Consumer {
-        String GROUP_ID = "group.id";
-        String BOOTSTRAP_SERVERS = "bootstrap.servers";
-        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
-        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
-        String KEY_DESERIALIZER = "key.deserializer";
-        String VALUE_DESERIALIZER = "value.deserializer";
-    }
 
-    /**
-     * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
-     * affect the number of consumer records returned in the first poll. By default this parameter is set to UNCOMMITTED_EARLIEST. <br/><br/>
-     * The allowed values are EARLIEST, LATEST, UNCOMMITTED_EARLIEST, UNCOMMITTED_LATEST. <br/>
-     * <ul>
-     * <li>EARLIEST means that the kafka spout polls records starting in the first offset of the partition, regardless of previous commits</li>
-     * <li>LATEST means that the kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits</li>
-     * <li>UNCOMMITTED_EARLIEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as EARLIEST.</li>
-     * <li>UNCOMMITTED_LATEST means that the kafka spout polls records from the last committed offset, if any.
-     * If no offset has been committed, it behaves as LATEST.</li>
-     * </ul>
-     * */
-    public enum FirstPollOffsetStrategy {
-        EARLIEST,
-        LATEST,
-        UNCOMMITTED_EARLIEST,
-        UNCOMMITTED_LATEST }
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutConfig.class);
+    private static final long serialVersionUID = 141902646130682494L;
+    // 200ms
+    public static final long DEFAULT_POLL_TIMEOUT_MS = 200;
+    // 30s
+    public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;
+    // Retry forever
+    public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;
+    // 10,000,000 records => 80MBs of memory footprint in the worst case
+    public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;
+    // 2s
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000;
+
+    public static final FirstPollOffsetStrategy DEFAULT_FIRST_POLL_OFFSET_STRATEGY = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+
+    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =
+        new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+
+    public static final ProcessingGuarantee DEFAULT_PROCESSING_GUARANTEE = ProcessingGuarantee.AT_LEAST_ONCE;
+
+    public static final KafkaTupleListener DEFAULT_TUPLE_LISTENER = new EmptyKafkaTupleListener();
+
+    public static final int DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS = 60;
+
 
     // Kafka consumer configuration
     private final Map<String, Object> kafkaProps;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
+    private final Subscription subscription;
     private final long pollTimeoutMs;
 
     // Kafka spout configuration
+    private final RecordTranslator<K, V> translator;
     private final long offsetCommitPeriodMs;
-    private final int maxRetries;
     private final int maxUncommittedOffsets;
     private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final KafkaSpoutStreams kafkaSpoutStreams;
-    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
     private final KafkaSpoutRetryService retryService;
+    private final KafkaTupleListener tupleListener;
+    private final long partitionRefreshPeriodMs;
+    private final boolean emitNullTuples;
+    private final SerializableDeserializer<K> keyDes;
+    private final Class<? extends Deserializer<K>> keyDesClazz;
+    private final SerializableDeserializer<V> valueDes;
+    private final Class<? extends Deserializer<V>> valueDesClazz;
+    private final ProcessingGuarantee processingGuarantee;
+    private final boolean tupleTrackingEnforced;
+    private final int metricsTimeBucketSizeInSecs;
 
-    private KafkaSpoutConfig(Builder<K,V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-        this.keyDeserializer = builder.keyDeserializer;
-        this.valueDeserializer = builder.valueDeserializer;
+    /**
+     * Creates a new KafkaSpoutConfig using a Builder.
+     *
+     * @param builder The Builder to construct the KafkaSpoutConfig from
+     */
+    public KafkaSpoutConfig(Builder<K, V> builder) {
+        setKafkaPropsForProcessingGuarantee(builder);
+        this.kafkaProps = builder.kafkaProps;
+        this.subscription = builder.subscription;
+        this.translator = builder.translator;
         this.pollTimeoutMs = builder.pollTimeoutMs;
         this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.maxRetries = builder.maxRetries;
         this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
         this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-        this.tuplesBuilder = builder.tuplesBuilder;
         this.retryService = builder.retryService;
+        this.tupleListener = builder.tupleListener;
+        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+        this.emitNullTuples = builder.emitNullTuples;
+        this.keyDes = builder.keyDes;
+        this.keyDesClazz = builder.keyDesClazz;
+        this.valueDes = builder.valueDes;
+        this.valueDesClazz = builder.valueDesClazz;
+        this.processingGuarantee = builder.processingGuarantee;
+        this.tupleTrackingEnforced = builder.tupleTrackingEnforced;
+        this.metricsTimeBucketSizeInSecs = builder.metricsTimeBucketSizeInSecs;
     }
 
-    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
-        // set defaults for properties not specified
-        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+    /**
+     * Defines how the {@link KafkaSpout} seeks the offset to be used in the first poll to Kafka upon topology deployment.
+     * By default this parameter is set to UNCOMMITTED_EARLIEST. 
+     */
+    public enum FirstPollOffsetStrategy {
+        /**
+         * The kafka spout polls records starting in the first offset of the partition, regardless of previous commits. This setting only
+         * takes effect on topology deployment
+         */
+        EARLIEST,
+        /**
+         * The kafka spout polls records with offsets greater than the last offset in the partition, regardless of previous commits. This
+         * setting only takes effect on topology deployment
+         */
+        LATEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as EARLIEST
+         */
+        UNCOMMITTED_EARLIEST,
+        /**
+         * The kafka spout polls records from the last committed offset, if any. If no offset has been committed it behaves as LATEST
+         */
+        UNCOMMITTED_LATEST;
+
+        @Override
+        public String toString() {
+            return "FirstPollOffsetStrategy{" + super.toString() + "}";
         }
-        return kafkaProps;
     }
 
-    public static class Builder<K,V> {
+    /**
+     * This enum controls when the tuple with the {@link ConsumerRecord} for an offset is marked as processed,
+     * i.e. when the offset can be committed to Kafka. The default value is AT_LEAST_ONCE.
+     * The commit interval is controlled by {@link KafkaSpoutConfig#getOffsetsCommitPeriodMs() }, if the mode commits on an interval.
+     * NO_GUARANTEE may be removed in a later release without warning, we're still evaluating whether it makes sense to keep.
+     */
+    @InterfaceStability.Unstable
+    public enum ProcessingGuarantee {
+        /**
+         * An offset is ready to commit only after the corresponding tuple has been processed and acked (at least once). If a tuple fails or
+         * times out it will be re-emitted, as controlled by the {@link KafkaSpoutRetryService}. Commits synchronously on the defined
+         * interval.
+         */
+        AT_LEAST_ONCE,
+        /**
+         * Every offset will be synchronously committed to Kafka right after being polled but before being emitted to the downstream
+         * components of the topology. The commit interval is ignored. This mode guarantees that the offset is processed at most once by
+         * ensuring the spout won't retry tuples that fail or time out after the commit to Kafka has been done
+         */
+        AT_MOST_ONCE,
+        /**
+         * The polled offsets are ready to commit immediately after being polled. The offsets are committed periodically, i.e. a message may
+         * be processed 0, 1 or more times. This behavior is similar to setting enable.auto.commit=true in the consumer, but allows the
+         * spout to control when commits occur. Commits asynchronously on the defined interval.
+         */
+        NO_GUARANTEE,
+    }
+
+    public static class Builder<K, V> {
+
         private final Map<String, Object> kafkaProps;
-        private Deserializer<K> keyDeserializer;
-        private Deserializer<V> valueDeserializer;
+        private final Subscription subscription;
+        private final SerializableDeserializer<K> keyDes;
+        private final Class<? extends Deserializer<K>> keyDesClazz;
+        private final SerializableDeserializer<V> valueDes;
+        private final Class<? extends Deserializer<V>> valueDesClazz;
+        private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
-        private int maxRetries = DEFAULT_MAX_RETRIES;
-        private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-        private final KafkaSpoutStreams kafkaSpoutStreams;
+        private FirstPollOffsetStrategy firstPollOffsetStrategy = DEFAULT_FIRST_POLL_OFFSET_STRATEGY;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
-        private final KafkaSpoutRetryService retryService;
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
-         * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
-         * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
-         */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
-            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
-                    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-                            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
-        }
-
-        /***
-         * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
-         * The optional configuration can be specified using the set methods of this builder
-         * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
-         * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
-         * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
-         * @param retryService  logic that manages the retrial of failed tuples
-         */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
-            if (kafkaProps == null || kafkaProps.isEmpty()) {
-                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
-            }
+        private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+        private KafkaTupleListener tupleListener = DEFAULT_TUPLE_LISTENER;
+        private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+        private boolean emitNullTuples = false;
+        private ProcessingGuarantee processingGuarantee = DEFAULT_PROCESSING_GUARANTEE;
+        private boolean tupleTrackingEnforced = false;
+        private int metricsTimeBucketSizeInSecs = DEFAULT_METRICS_TIME_BUCKET_SIZE_SECONDS;
+
+        public Builder(String bootstrapServers, String... topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+        }
 
-            if (kafkaSpoutStreams == null)  {
-                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
-            }
+        public Builder(String bootstrapServers, Collection<String> topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(),
+                new NamedTopicFilter(new HashSet<String>(topics))));
+        }
 
-            if (tuplesBuilder == null) {
-                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
-            }
+        public Builder(String bootstrapServers, Pattern topics) {
+            this(bootstrapServers, (SerializableDeserializer) null, (SerializableDeserializer) null, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+        }
 
-            if (retryService == null) {
-                throw new IllegalArgumentException("Must specify at implementation of retry service");
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String... topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with
+         * {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the
+         * deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and
+         * {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+            this(bootstrapServers, keyDes, null, valDes, null, subscription);
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.lang.String...)} instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String... topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(topics)));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.Collection) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new NamedTopicFilter(new HashSet<String>(topics))));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, java.util.regex.Pattern) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+            this(bootstrapServers, keyDes, valDes, new ManualPartitionSubscription(new RoundRobinManualPartitioner(), new PatternTopicFilter(topics)));
+        }
+
+        /**
+         * @deprecated Please use {@link #Builder(java.lang.String, org.apache.storm.kafka.spout.Subscription) } instead, and set the deserializer with {@link #setProp(java.lang.String, java.lang.Object)}, {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} and {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG}
+         */
+        @Deprecated
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+            this(bootstrapServers, null, keyDes, null, valDes, subscription);
+        }
+
+        /**
+         * Create a KafkaSpoutConfig builder with default property values and no key/value deserializers.
+         *
+         * @param bootstrapServers The bootstrap servers the consumer will use
+         * @param subscription The subscription defining which topics and partitions each spout instance will read.
+         */
+        public Builder(String bootstrapServers, Subscription subscription) {
+            this(bootstrapServers, null, null, null, null, subscription);
+        }
+
+        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+            SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+
+            this(keyDes, keyDesClazz, valDes, valDesClazz, subscription,
+                    new DefaultRecordTranslator<K, V>(), new HashMap<String, Object>());
+
+            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+                throw new IllegalArgumentException("bootstrap servers cannot be null");
             }
+            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
+        }
 
+        /**
+         * This constructor will always be called by one of the methods {@code setKey} or {@code setVal}, which implies
+         * that only one of its SerDe parameters will be non null, for which the corresponding Kafka property will be set
+         */
+        @SuppressWarnings("unchecked")
+        private Builder(final Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                        SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+
+            this(keyDes, keyDesClazz, valueDes, valueDesClazz, builder.subscription,
+                    (RecordTranslator<K, V>) builder.translator, new HashMap<>(builder.kafkaProps));
+
+            this.pollTimeoutMs = builder.pollTimeoutMs;
+            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+            this.retryService = builder.retryService;
+
+            setNonNullSerDeKafkaProp(keyDes, keyDesClazz, valueDes, valueDesClazz);
+        }
+
+        private Builder(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+               SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz,
+               Subscription subscription, RecordTranslator<K, V> translator, Map<String, Object> kafkaProps) {
+            this.keyDes = keyDes;
+            this.keyDesClazz = keyDesClazz;
+            this.valueDes = valueDes;
+            this.valueDesClazz = valueDesClazz;
+            this.subscription = subscription;
+            this.translator = translator;
             this.kafkaProps = kafkaProps;
-            this.kafkaSpoutStreams = kafkaSpoutStreams;
-            this.tuplesBuilder = tuplesBuilder;
-            this.retryService = retryService;
+        }
+
+        private void setNonNullSerDeKafkaProp(SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+            if (keyDesClazz != null) {
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDesClazz);
+            }
+            if (keyDes != null) {
+                kafkaProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, keyDes.getClass());
+            }
+            if (valueDesClazz != null) {
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDesClazz);
+            }
+            if (valueDes != null) {
+                kafkaProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, valueDes.getClass());
+            }
+        }
+
+        /**
+         * Specifying this key deserializer overrides the property key.deserializer. If you have set a custom RecordTranslator before
+         * calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+         */
+        @Deprecated
+        public <NK> Builder<NK, V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+            return new Builder<>(this, keyDeserializer, null, null, null);
         }
 
         /**
-         * Specifying this key deserializer overrides the property key.deserializer
+         * Specify a class that can be instantiated to create a key.deserializer This is the same as setting key.deserializer, but overrides
+         * it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
          */
-        public Builder<K,V> setKeyDeserializer(Deserializer<K> keyDeserializer) {
-            this.keyDeserializer = keyDeserializer;
+        @Deprecated
+        public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+            return new Builder<>(this, null, clazz, null, null);
+        }
+
+        /**
+         * Specifying this value deserializer overrides the property value.deserializer. If you have set a custom RecordTranslator before
+         * calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+         */
+        @Deprecated
+        public <NV> Builder<K, NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+            return new Builder<>(this, null, null, valueDeserializer, null);
+        }
+
+        /**
+         * Specify a class that can be instantiated to create a value.deserializer This is the same as setting value.deserializer, but
+         * overrides it. If you have set a custom RecordTranslator before calling this it may result in class cast exceptions at runtime.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+         */
+        @Deprecated
+        public <NV> Builder<K, NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+            return new Builder<>(this, null, null, null, clazz);
+        }
+
+        /**
+         * Set a {@link KafkaConsumer} property.
+         */
+        public Builder<K, V> setProp(String key, Object value) {
+            kafkaProps.put(key, value);
             return this;
         }
 
         /**
-         * Specifying this value deserializer overrides the property value.deserializer
+         * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setValueDeserializer(Deserializer<V> valueDeserializer) {
-            this.valueDeserializer = valueDeserializer;
+        public Builder<K, V> setProp(Map<String, Object> props) {
+            kafkaProps.putAll(props);
             return this;
         }
 
         /**
-         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
-         * @param pollTimeoutMs time in ms
+         * Set multiple {@link KafkaConsumer} properties.
          */
-        public Builder<K,V> setPollTimeoutMs(long pollTimeoutMs) {
-            this.pollTimeoutMs = pollTimeoutMs;
+        public Builder<K, V> setProp(Properties props) {
+            for(Entry<Object, Object> entry : props.entrySet()) {
+            if (entry.getKey() instanceof String) {
+                    kafkaProps.put((String) entry.getKey(), entry.getValue());
+                } else {
+                    throw new IllegalArgumentException("Kafka Consumer property keys must be Strings");
+                }
+            }
             return this;
         }
 
         /**
-         * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
-         * @param offsetCommitPeriodMs time in ms
+         * Set the group.id for the consumers
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#GROUP_ID_CONFIG} instead
          */
-        public Builder<K,V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
-            this.offsetCommitPeriodMs = offsetCommitPeriodMs;
+        @Deprecated
+        public Builder<K, V> setGroupId(String id) {
+            return setProp("group.id", id);
+        }
+
+        /**
+         * reset the bootstrap servers for the Consumer
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#BOOTSTRAP_SERVERS_CONFIG} instead
+         */
+        @Deprecated
+        public Builder<K, V> setBootstrapServers(String servers) {
+            return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        }
+
+        /**
+         * The minimum amount of data the broker should return for a fetch request.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#FETCH_MIN_BYTES_CONFIG} instead
+         */
+        @Deprecated
+        public Builder<K, V> setFetchMinBytes(int bytes) {
+            return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
+        }
+
+        /**
+         * The maximum amount of data per-partition the broker will return.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_PARTITION_FETCH_BYTES_CONFIG} instead
+         */
+        @Deprecated
+        public Builder<K, V> setMaxPartitionFectchBytes(int bytes) {
+            return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes);
+        }
+
+        /**
+         * The maximum number of records a poll will return.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with {@link ConsumerConfig#MAX_POLL_RECORDS_CONFIG} instead
+         */
+        @Deprecated
+        public Builder<K, V> setMaxPollRecords(int records) {
+            return setProp(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, records);
+        }
+
+        //Security Related Configs
+        /**
+         * Configure the SSL Keystore for mutual authentication
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location" and "ssl.keystore.password" instead
+         */
+        @Deprecated
+        public Builder<K, V> setSSLKeystore(String location, String password) {
+            return setProp("ssl.keystore.location", location)
+                .setProp("ssl.keystore.password", password);
+        }
+
+        /**
+         * Configure the SSL Keystore for mutual authentication
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "ssl.keystore.location", "ssl.keystore.password" and "ssl.key.password" instead
+         */
+        @Deprecated
+        public Builder<K, V> setSSLKeystore(String location, String password, String keyPassword) {
+            return setProp("ssl.key.password", keyPassword)
+                .setSSLKeystore(location, password);
+        }
+
+        /**
+         * Configure the SSL Truststore to authenticate with the brokers
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol", "ssl.truststore.location" and "ssl.truststore.password" instead
+         */
+        @Deprecated
+        public Builder<K, V> setSSLTruststore(String location, String password) {
+            return setSecurityProtocol("SSL")
+                .setProp("ssl.truststore.location", location)
+                .setProp("ssl.truststore.password", password);
+        }
+
+        /**
+         * Protocol used to communicate with brokers. Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+         *
+         * @deprecated Please use {@link #setProp(java.lang.String, java.lang.Object) } with "security.protocol" instead
+         */
+        @Deprecated
+        public Builder<K, V> setSecurityProtocol(String protocol) {
+            return setProp("security.protocol", protocol);
+        }
+
+        //Spout Settings
+        /**
+         * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s.
+         *
+         * @param pollTimeoutMs time in ms
+         */
+        public Builder<K, V> setPollTimeoutMs(long pollTimeoutMs) {
+            this.pollTimeoutMs = pollTimeoutMs;
             return this;
         }
 
         /**
-         * Defines the max number of retrials in case of tuple failure. The default is to retry forever, which means that
-         * no new records are committed until the previous polled records have been acked. This guarantees at once delivery of
-         * all the previously polled records.
-         * By specifying a finite value for maxRetries, the user decides to sacrifice guarantee of delivery for the previous
-         * polled records in favor of processing more records.
-         * @param maxRetries max number of retrials
+         * Specifies the period, in milliseconds, the offset commit task is periodically called. Default is 15s.
+         *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE} or
+         * {@link ProcessingGuarantee#NO_GUARANTEE}.
+         *
+         * @param offsetCommitPeriodMs time in ms
          */
-        public Builder<K,V> setMaxRetries(int maxRetries) {
-            this.maxRetries = maxRetries;
+        public Builder<K, V> setOffsetCommitPeriodMs(long offsetCommitPeriodMs) {
+            this.offsetCommitPeriodMs = offsetCommitPeriodMs;
             return this;
         }
 
         /**
          * Defines the max number of polled offsets (records) that can be pending commit, before another poll can take place.
          * Once this limit is reached, no more offsets (records) can be polled until the next successful commit(s) sets the number
-         * of pending offsets bellow the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * of pending offsets below the threshold. The default is {@link #DEFAULT_MAX_UNCOMMITTED_OFFSETS}.
+         * This limit is per partition and may in some cases be exceeded,
+         * but each partition cannot exceed this limit by more than maxPollRecords - 1.
+         * 
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
          * @param maxUncommittedOffsets max number of records that can be be pending commit
          */
-        public Builder<K,V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
+        public Builder<K, V> setMaxUncommittedOffsets(int maxUncommittedOffsets) {
             this.maxUncommittedOffsets = maxUncommittedOffsets;
             return this;
         }
 
         /**
-         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start.
-         * Please refer to to the documentation in {@link FirstPollOffsetStrategy}
+         * Sets the offset used by the Kafka spout in the first poll to Kafka broker upon process start. Please refer to to the
+         * documentation in {@link FirstPollOffsetStrategy}
+         *
          * @param firstPollOffsetStrategy Offset used by Kafka spout first poll
-         * */
+         */
         public Builder<K, V> setFirstPollOffsetStrategy(FirstPollOffsetStrategy firstPollOffsetStrategy) {
             this.firstPollOffsetStrategy = firstPollOffsetStrategy;
             return this;
         }
 
-        public KafkaSpoutConfig<K,V> build() {
+        /**
+         * Sets the retry service for the spout to use.
+         *
+         * <p>This setting only has an effect if the configured {@link ProcessingGuarantee} is {@link ProcessingGuarantee#AT_LEAST_ONCE}.
+         *
+         * @param retryService the new retry service
+         * @return the builder (this).
+         */
+        public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) {
+            if (retryService == null) {
+                throw new NullPointerException("retryService cannot be null");
+            }
+            this.retryService = retryService;
+            return this;
+        }
+
+        /**
+         * Sets the tuple listener for the spout to use.
+         *
+         * @param tupleListener the tuple listener
+         * @return the builder (this).
+         */
+        public Builder<K, V> setTupleListener(KafkaTupleListener tupleListener) {
+            if (tupleListener == null) {
+                throw new NullPointerException("KafkaTupleListener cannot be null");
+            }
+            this.tupleListener = tupleListener;
+            return this;
+        }
+
+        public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
+            this.translator = translator;
+            return this;
+        }
+
+        /**
+         * Configure a translator with tuples to be emitted on the default stream.
+         *
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @return this to be able to chain configuration
+         */
+        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
+        }
+
+        /**
+         * Configure a translator with tuples to be emitted to a given stream.
+         *
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @param stream the stream to emit the tuples on
+         * @return this to be able to chain configuration
+         */
+        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
+        }
+
+        /**
+         * Sets partition refresh period in milliseconds. This is how often kafka will be polled to check for new topics and/or new
+         * partitions. This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+         * PatternSubscription rely on kafka to handle this instead.
+         *
+         * @param partitionRefreshPeriodMs time in milliseconds
+         * @return the builder (this)
+         */
+        public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
+            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+            return this;
+        }
+
+        /**
+         * Specifies if the spout should emit null tuples to the component downstream, or rather not emit and directly ack them. By default
+         * this parameter is set to false, which means that null tuples are not emitted.
+         *
+         * @param emitNullTuples sets if null tuples should or not be emitted downstream
+         */
+        public Builder<K, V> setEmitNullTuples(boolean emitNullTuples) {
+            this.emitNullTuples = emitNullTuples;
+            return this;
+        }
+
+        /**
+         * Specifies which processing guarantee the spout should offer. Refer to the documentation for {@link ProcessingGuarantee}.
+         *
+         * @param processingGuarantee The processing guarantee the spout should offer.
+         */
+        public Builder<K, V> setProcessingGuarantee(ProcessingGuarantee processingGuarantee) {
+            this.processingGuarantee = processingGuarantee;
+            return this;
+        }
+
+        /**
+         * Specifies whether the spout should require Storm to track emitted tuples when using a {@link ProcessingGuarantee} other than
+         * {@link ProcessingGuarantee#AT_LEAST_ONCE}. The spout will always track emitted tuples when offering at-least-once guarantees
+         * regardless of this setting. This setting is false by default.
+         *
+         * <p>Enabling tracking can be useful even in cases where reliability is not a concern, because it allows
+         * {@link Config#TOPOLOGY_MAX_SPOUT_PENDING} to have an effect, and enables some spout metrics (e.g. complete-latency) that would
+         * otherwise be disabled.
+         *
+         * @param tupleTrackingEnforced true if Storm should track emitted tuples, false otherwise
+         */
+        public Builder<K, V> setTupleTrackingEnforced(boolean tupleTrackingEnforced) {
+            this.tupleTrackingEnforced = tupleTrackingEnforced;
+            return this;
+        }
+
+        /**
+         * The time period that metrics data in bucketed into.
+         * @param metricsTimeBucketSizeInSecs time in seconds
+         */
+        public Builder<K, V> setMetricsTimeBucketSizeInSecs(int metricsTimeBucketSizeInSecs) {
+            this.metricsTimeBucketSizeInSecs = metricsTimeBucketSizeInSecs;
+            return this;
+        }
+
+        public KafkaSpoutConfig<K, V> build() {
             return new KafkaSpoutConfig<>(this);
         }
     }
 
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, String... topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+    }
+
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topics to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+    }
+
+    /**
+     * Factory method that creates a Builder with String key/value deserializers.
+     *
+     * @param bootstrapServers The bootstrap servers for the consumer
+     * @param topics The topic pattern to subscribe to
+     * @return The new builder
+     */
+    public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+        return setStringDeserializers(new Builder<String, String>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics));
+    }
+
+    private static Builder<String, String> setStringDeserializers(Builder<String, String> builder) {
+        builder.setProp(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        builder.setProp(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+        return builder;
+    }
+
+    private static void setKafkaPropsForProcessingGuarantee(Builder<?, ?> builder) {
+        if (builder.kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            LOG.warn("The KafkaConsumer " + ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG
+                + " setting is not supported. You can configure similar behavior through KafkaSpoutConfig.Builder.setProcessingGuarantee."
+                + "This will be treated as an error in the next major release.");
+
+            final boolean enableAutoCommit = Boolean.parseBoolean(builder.kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG).toString());
+            if (enableAutoCommit) {
+                builder.processingGuarantee = ProcessingGuarantee.NO_GUARANTEE;
+            } else {
+                builder.processingGuarantee = ProcessingGuarantee.AT_LEAST_ONCE;
+            }
+        }
+        String autoOffsetResetPolicy = (String) builder.kafkaProps.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+        if (builder.processingGuarantee == ProcessingGuarantee.AT_LEAST_ONCE) {
+            if (autoOffsetResetPolicy == null) {
+                /*
+                 * If the user wants to explicitly set an auto offset reset policy, we should respect it, but when the spout is configured
+                 * for at-least-once processing we should default to seeking to the earliest offset in case there's an offset out of range
+                 * error, rather than seeking to the latest (Kafka's default). This type of error will typically happen when the consumer
+                 * requests an offset that was deleted.
+                 */
+                LOG.info("Setting Kafka consumer property '{}' to 'earliest' to ensure at-least-once processing",
+                    ConsumerConfig.AUTO_OFFSET_RESET_CONFIG);
+                builder.kafkaProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+            } else if (!autoOffsetResetPolicy.equals("earliest") && !autoOffsetResetPolicy.equals("none")) {
+                LOG.warn("Cannot guarantee at-least-once processing with auto.offset.reset.policy other than 'earliest' or 'none'."
+                    + " Some messages may be skipped.");
+            }
+        } else if (builder.processingGuarantee == ProcessingGuarantee.AT_MOST_ONCE) {
+            if (autoOffsetResetPolicy != null
+                && (!autoOffsetResetPolicy.equals("latest") && !autoOffsetResetPolicy.equals("none"))) {
+                LOG.warn("Cannot guarantee at-most-once processing with auto.offset.reset.policy other than 'latest' or 'none'."
+                    + " Some messages may be processed more than once.");
+            }
+        }
+        LOG.info("Setting Kafka consumer property '{}' to 'false', because the spout does not support auto-commit",
+            ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
+        builder.kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
+    }
+
+    /**
+     * Gets the properties that will be passed to the KafkaConsumer.
+     *
+     * @return The Kafka properties map
+     */
     public Map<String, Object> getKafkaProps() {
         return kafkaProps;
     }
 
+    /**
+     * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#KEY_DESERIALIZER_CLASS_CONFIG} instead
+     */
+    @Deprecated
     public Deserializer<K> getKeyDeserializer() {
-        return keyDeserializer;
+        if (keyDesClazz != null) {
+            try {
+                return keyDesClazz.newInstance();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new RuntimeException("Could not instantiate key deserializer " + keyDesClazz);
+            }
+        }
+        return keyDes;
     }
 
+    /**
+     * @deprecated Please use {@link #getKafkaProps() } and look up the entry for {@link ConsumerConfig#VALUE_DESERIALIZER_CLASS_CONFIG} instead
+     */
+    @Deprecated
     public Deserializer<V> getValueDeserializer() {
-        return valueDeserializer;
+        if (valueDesClazz != null) {
+            try {
+                return valueDesClazz.newInstance();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new RuntimeException("Could not instantiate value deserializer " + valueDesClazz);
+            }
+        }
+        return valueDes;
+    }
+
+    public Subscription getSubscription() {
+        return subscription;
+    }
+
+    public RecordTranslator<K, V> getTranslator() {
+        return translator;
     }
 
     public long getPollTimeoutMs() {
@@ -254,75 +832,71 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return offsetCommitPeriodMs;
     }
 
+    /**
+     * @deprecated Use {@link #getProcessingGuarantee()} instead.
+     */
+    @Deprecated
     public boolean isConsumerAutoCommitMode() {
-        return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null     // default is true
-                || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null // default is false
+            || Boolean.valueOf((String) kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
+    }
+    
+    public ProcessingGuarantee getProcessingGuarantee() {
+        return processingGuarantee;
     }
 
-    public String getConsumerGroupId() {
-        return (String) kafkaProps.get(Consumer.GROUP_ID);
+    public boolean isTupleTrackingEnforced() {
+        return tupleTrackingEnforced;
     }
 
-    /**
-     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream},
-     * or null if this stream is associated with a wildcard pattern topic
-     */
-    public List<String> getSubscribedTopics() {
-        return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
-            new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) :
-            null;
+    public String getConsumerGroupId() {
+        return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
     }
 
-    /**
-     * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a specific named topic
-     */
-    public Pattern getTopicWildcardPattern() {
-        return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
-                ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
-                null;
+    public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
+        return firstPollOffsetStrategy;
     }
 
-    public int getMaxTupleRetries() {
-        return maxRetries;
+    public int getMaxUncommittedOffsets() {
+        return maxUncommittedOffsets;
     }
 
-    public FirstPollOffsetStrategy getFirstPollOffsetStrategy() {
-        return firstPollOffsetStrategy;
+    public KafkaSpoutRetryService getRetryService() {
+        return retryService;
     }
 
-    public KafkaSpoutStreams getKafkaSpoutStreams() {
-        return kafkaSpoutStreams;
+    public KafkaTupleListener getTupleListener() {
+        return tupleListener;
     }
 
-    public int getMaxUncommittedOffsets() {
-        return maxUncommittedOffsets;
+    public long getPartitionRefreshPeriodMs() {
+        return partitionRefreshPeriodMs;
     }
 
-    public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
-        return tuplesBuilder;
+    public boolean isEmitNullTuples() {
+        return emitNullTuples;
     }
 
-    public KafkaSpoutRetryService getRetryService() {
-        return retryService;
+    public int getMetricsTimeBucketSizeInSecs() {
+        return metricsTimeBucketSizeInSecs;
     }
 
     @Override
     public String toString() {
-        return "KafkaSpoutConfig{" +
-                "kafkaProps=" + kafkaProps +
-                ", keyDeserializer=" + keyDeserializer +
-                ", valueDeserializer=" + valueDeserializer +
-                ", pollTimeoutMs=" + pollTimeoutMs +
-                ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
-                ", maxRetries=" + maxRetries +
-                ", maxUncommittedOffsets=" + maxUncommittedOffsets +
-                ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
-                ", kafkaSpoutStreams=" + kafkaSpoutStreams +
-                ", tuplesBuilder=" + tuplesBuilder +
-                ", retryService=" + retryService +
-                ", topics=" + getSubscribedTopics() +
-                ", topicWildcardPattern=" + getTopicWildcardPattern() +
-                '}';
+        return "KafkaSpoutConfig{"
+            + "kafkaProps=" + kafkaProps
+            + ", key=" + getKeyDeserializer()
+            + ", value=" + getValueDeserializer()
+            + ", pollTimeoutMs=" + pollTimeoutMs
+            + ", offsetCommitPeriodMs=" + offsetCommitPeriodMs
+            + ", maxUncommittedOffsets=" + maxUncommittedOffsets
+            + ", firstPollOffsetStrategy=" + firstPollOffsetStrategy
+            + ", subscription=" + subscription
+            + ", translator=" + translator
+            + ", retryService=" + retryService
+            + ", tupleListener=" + tupleListener
+            + ", processingGuarantee=" + processingGuarantee
+            + ", metricsTimeBucketSizeInSecs=" + metricsTimeBucketSizeInSecs
+            + '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 71f8327..1626fee 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -18,21 +18,42 @@
 
 package org.apache.storm.kafka.spout;
 
+import java.io.Serializable;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.common.TopicPartition;
 
-public class KafkaSpoutMessageId {
-    private transient TopicPartition topicPart;
-    private transient long offset;
-    private transient int numFails = 0;
+public class KafkaSpoutMessageId implements Serializable {
+    private final TopicPartition topicPart;
+    private final long offset;
+    private int numFails = 0;
+    /**
+     * true if the record was emitted using a form of collector.emit(...). false
+     * when skipping null tuples as configured by the user in KafkaSpoutConfig
+     */
+    private boolean emitted;
 
-    public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
-        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
+    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
+        this(consumerRecord, true);
+    }
+
+    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord, boolean emitted) {
+        this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset(), emitted);
     }
 
     public KafkaSpoutMessageId(TopicPartition topicPart, long offset) {
+        this(topicPart, offset, true);
+    }
+
+    /**
+     * Creates a new KafkaSpoutMessageId.
+     * @param topicPart The topic partition this message belongs to
+     * @param offset The offset of this message
+     * @param emitted True iff this message is not being skipped as a null tuple
+     */
+    public KafkaSpoutMessageId(TopicPartition topicPart, long offset, boolean emitted) {
         this.topicPart = topicPart;
         this.offset = offset;
+        this.emitted = emitted;
     }
 
     public int partition() {
@@ -59,22 +80,22 @@ public class KafkaSpoutMessageId {
         return topicPart;
     }
 
-    public String getMetadata(Thread currThread) {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                ", thread='" + currThread.getName() + "'" +
-                '}';
+    public boolean isEmitted() {
+        return emitted;
+    }
+
+    public void setEmitted(boolean emitted) {
+        this.emitted = emitted;
     }
 
     @Override
     public String toString() {
-        return "{" +
-                "topic-partition=" + topicPart +
-                ", offset=" + offset +
-                ", numFails=" + numFails +
-                '}';
+        return "{"
+            + "topic-partition=" + topicPart
+            + ", offset=" + offset
+            + ", numFails=" + numFails
+            + ", emitted=" + emitted
+            + '}';
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
index f59367d..68a6f3f 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryExponentialBackoff.java
@@ -25,49 +25,63 @@ import org.slf4j.LoggerFactory;
 import java.io.Serializable;
 import java.util.Collection;
 import java.util.Comparator;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.utils.Time;
 
 /**
  * Implementation of {@link KafkaSpoutRetryService} using the exponential backoff formula. The time of the nextRetry is set as follows:
- * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1)    where failCount = 1, 2, 3, ...
+ * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod*2^(failCount-1)    where failCount = 1, 2, 3, ...
  * nextRetry = Min(nextRetry, currentTime + maxDelay)
  */
 public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutRetryExponentialBackoff.class);
     private static final RetryEntryTimeStampComparator RETRY_ENTRY_TIME_STAMP_COMPARATOR = new RetryEntryTimeStampComparator();
 
-    private TimeInterval initialDelay;
-    private TimeInterval delayPeriod;
-    private TimeInterval maxDelay;
-    private int maxRetries;
+    private final TimeInterval initialDelay;
+    private final TimeInterval delayPeriod;
+    private final TimeInterval maxDelay;
+    private final int maxRetries;
 
-    private Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
-    private Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
+    //This class assumes that there is at most one retry schedule per message id in this set at a time.
+    private final Set<RetrySchedule> retrySchedules = new TreeSet<>(RETRY_ENTRY_TIME_STAMP_COMPARATOR);
+    private final Set<KafkaSpoutMessageId> toRetryMsgs = new HashSet<>();      // Convenience data structure to speedup lookups
 
     /**
      * Comparator ordering by timestamp 
      */
     private static class RetryEntryTimeStampComparator implements Serializable, Comparator<RetrySchedule> {
+        @Override
         public int compare(RetrySchedule entry1, RetrySchedule entry2) {
-            return Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+            int result = Long.valueOf(entry1.nextRetryTimeNanos()).compareTo(entry2.nextRetryTimeNanos());
+            
+            if(result == 0) {
+                //TreeSet uses compareTo instead of equals() for the Set contract
+                //Ensure that we can save two retry schedules with the same timestamp
+                result = entry1.hashCode() - entry2.hashCode();
+            }
+            return result;
         }
     }
 
     private class RetrySchedule {
-        private KafkaSpoutMessageId msgId;
+        private final KafkaSpoutMessageId msgId;
         private long nextRetryTimeNanos;
 
-        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTime) {
+        public RetrySchedule(KafkaSpoutMessageId msgId, long nextRetryTimeNanos) {
             this.msgId = msgId;
-            this.nextRetryTimeNanos = nextRetryTime;
+            this.nextRetryTimeNanos = nextRetryTimeNanos;
             LOG.debug("Created {}", this);
         }
 
-        public void setNextRetryTime() {
+        public void setNextRetryTimeNanos() {
             nextRetryTimeNanos = nextTime(msgId);
             LOG.debug("Updated {}", this);
         }
@@ -80,7 +94,7 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         public String toString() {
             return "RetrySchedule{" +
                     "msgId=" + msgId +
-                    ", nextRetryTime=" + nextRetryTimeNanos +
+                    ", nextRetryTimeNanos=" + nextRetryTimeNanos +
                     '}';
         }
 
@@ -94,20 +108,20 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     public static class TimeInterval implements Serializable {
-        private long lengthNanos;
-        private long length;
-        private TimeUnit timeUnit;
+        private final long lengthNanos;
+        private final TimeUnit timeUnit;
+        private final long length;
 
         /**
          * @param length length of the time interval in the units specified by {@link TimeUnit}
          * @param timeUnit unit used to specify a time interval on which to specify a time unit
          */
         public TimeInterval(long length, TimeUnit timeUnit) {
-            this.length = length;
-            this.timeUnit = timeUnit;
             this.lengthNanos = timeUnit.toNanos(length);
+            this.timeUnit = timeUnit;
+            this.length = length;
         }
-
+        
         public static TimeInterval seconds(long length) {
             return new TimeInterval(length, TimeUnit.SECONDS);
         }
@@ -115,19 +129,15 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         public static TimeInterval milliSeconds(long length) {
             return new TimeInterval(length, TimeUnit.MILLISECONDS);
         }
-
+        
         public static TimeInterval microSeconds(long length) {
             return new TimeInterval(length, TimeUnit.MICROSECONDS);
         }
-
+        
         public long lengthNanos() {
             return lengthNanos;
         }
-
-        public long length() {
-            return length;
-        }
-
+        
         public TimeUnit timeUnit() {
             return timeUnit;
         }
@@ -144,7 +154,10 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     /**
      * The time stamp of the next retry is scheduled according to the exponential backoff formula ( geometric progression):
      * nextRetry = failCount == 1 ? currentTime + initialDelay : currentTime + delayPeriod^(failCount-1) where failCount = 1, 2, 3, ...
-     * nextRetry = Min(nextRetry, currentTime + maxDelay)
+     * nextRetry = Min(nextRetry, currentTime + maxDelay).
+     * 
+     * By specifying a value for maxRetries lower than Integer.MAX_VALUE, the user decides to sacrifice guarantee of delivery for the previous
+     * polled records in favor of processing more records.
      *
      * @param initialDelay      initial delay of the first retry
      * @param delayPeriod       the time interval that is the ratio of the exponential backoff formula (geometric progression)
@@ -157,35 +170,42 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
         this.delayPeriod = delayPeriod;
         this.maxRetries = maxRetries;
         this.maxDelay = maxDelay;
-        LOG.debug("Instantiated {}", this);
+        LOG.debug("Instantiated {}", this.toStringImpl());
     }
 
     @Override
-    public Set<TopicPartition> retriableTopicPartitions() {
-        final Set<TopicPartition> tps = new HashSet<>();
-        final long currentTimeNanos = System.nanoTime();
+    public Map<TopicPartition, Long> earliestRetriableOffsets() {
+        final Map<TopicPartition, Long> tpToEarliestRetriableOffset = new HashMap<>();
+        final long currentTimeNanos = Time.nanoTime();
         for (RetrySchedule retrySchedule : retrySchedules) {
             if (retrySchedule.retry(currentTimeNanos)) {
                 final KafkaSpoutMessageId msgId = retrySchedule.msgId;
-                tps.add(new TopicPartition(msgId.topic(), msgId.partition()));
+                final TopicPartition tpForMessage = new TopicPartition(msgId.topic(), msgId.partition());
+                final Long currentLowestOffset = tpToEarliestRetriableOffset.get(tpForMessage);
+                if(currentLowestOffset != null) {
+                    tpToEarliestRetriableOffset.put(tpForMessage, Math.min(currentLowestOffset, msgId.offset()));
+                } else {
+                    tpToEarliestRetriableOffset.put(tpForMessage, msgId.offset());
+                }
             } else {
                 break;  // Stop searching as soon as passed current time
             }
         }
-        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tps);
-        return tps;
+        LOG.debug("Topic partitions with entries ready to be retried [{}] ", tpToEarliestRetriableOffset);
+        return tpToEarliestRetriableOffset;
     }
 
     @Override
     public boolean isReady(KafkaSpoutMessageId msgId) {
         boolean retry = false;
-        if (toRetryMsgs.contains(msgId)) {
-            final long currentTimeNanos = System.nanoTime();
+        if (isScheduled(msgId)) {
+            final long currentTimeNanos = Time.nanoTime();
             for (RetrySchedule retrySchedule : retrySchedules) {
                 if (retrySchedule.retry(currentTimeNanos)) {
                     if (retrySchedule.msgId.equals(msgId)) {
                         retry = true;
                         LOG.debug("Found entry to retry {}", retrySchedule);
+                        break; //Stop searching if the message is known to be ready for retry
                     }
                 } else {
                     LOG.debug("Entry to retry not found {}", retrySchedule);
@@ -204,14 +224,14 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     @Override
     public boolean remove(KafkaSpoutMessageId msgId) {
         boolean removed = false;
-        if (toRetryMsgs.contains(msgId)) {
+        if (isScheduled(msgId)) {
+            toRetryMsgs.remove(msgId);
             for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
                 final RetrySchedule retrySchedule = iterator.next();
                 if (retrySchedule.msgId().equals(msgId)) {
                     iterator.remove();
-                    toRetryMsgs.remove(msgId);
                     removed = true;
-                    break;
+                    break; //There is at most one schedule per message id
                 }
             }
         }
@@ -239,38 +259,66 @@ public class KafkaSpoutRetryExponentialBackoff implements KafkaSpoutRetryService
     }
 
     @Override
-    public void schedule(KafkaSpoutMessageId msgId) {
+    public boolean schedule(KafkaSpoutMessageId msgId) {
         if (msgId.numFails() > maxRetries) {
             LOG.debug("Not scheduling [{}] because reached maximum number of retries [{}].", msgId, maxRetries);
+            return false;
         } else {
-            if (toRetryMsgs.contains(msgId)) {
-                for (Iterator<RetrySchedule> iterator = retrySchedules.iterator(); iterator.hasNext(); ) {
-                    final RetrySchedule retrySchedule = iterator.next();
-                    if (retrySchedule.msgId().equals(msgId)) {
-                        iterator.remove();
-                        toRetryMsgs.remove(msgId);
-                    }
-                }
-            }
+            //Remove existing schedule for the message id
+            remove(msgId);
             final RetrySchedule retrySchedule = new RetrySchedule(msgId, nextTime(msgId));
             retrySchedules.add(retrySchedule);
             toRetryMsgs.add(msgId);
             LOG.debug("Scheduled. {}", retrySchedule);
             LOG.trace("Current state {}", retrySchedules);
+            return true;
+        }
+    }
+    
+    @Override
+    public int readyMessageCount() {
+        int count = 0;
+        final long currentTimeNanos = Time.nanoTime();
+        for (RetrySchedule retrySchedule : retrySchedules) {
+            if (retrySchedule.retry(currentTimeNanos)) {
+                ++count;
+            } else {
+                break; //Stop counting when past current time
+            }
         }
+        return count;
+    }
+
+    @Override
+    public KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record) {
+        KafkaSpoutMessageId msgId = new KafkaSpoutMessageId(record);
+        if (isScheduled(msgId)) {
+            for (KafkaSpoutMessageId originalMsgId : toRetryMsgs) {
+                if (originalMsgId.equals(msgId)) {
+                    return originalMsgId;
+                }
+            }
+        }
+        return msgId;
     }
 
     // if value is greater than Long.MAX_VALUE it truncates to Long.MAX_VALUE
     private long nextTime(KafkaSpoutMessageId msgId) {
-        final long currentTimeNanos = System.nanoTime();
+        Validate.isTrue(msgId.numFails() > 0, "nextTime assumes the message has failed at least once");
+        final long currentTimeNanos = Time.nanoTime();
         final long nextTimeNanos = msgId.numFails() == 1                // numFails = 1, 2, 3, ...
                 ? currentTimeNanos + initialDelay.lengthNanos
-                : (currentTimeNanos + delayPeriod.timeUnit.toNanos((long) Math.pow(delayPeriod.length, msgId.numFails() - 1)));
+                : currentTimeNanos + delayPeriod.lengthNanos * (long)(Math.pow(2, msgId.numFails()-1));
         return Math.min(nextTimeNanos, currentTimeNanos + maxDelay.lengthNanos);
     }
 
     @Override
     public String toString() {
+        return toStringImpl();
+    }
+    
+    private String toStringImpl() {
+        //This is here to avoid an overridable call in the constructor
         return "KafkaSpoutRetryExponentialBackoff{" +
                 "delay=" + initialDelay +
                 ", ratio=" + delayPeriod +

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
index 5aab167..12d26da 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutRetryService.java
@@ -18,25 +18,30 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.common.TopicPartition;
 
 import java.io.Serializable;
 import java.util.Collection;
-import java.util.Set;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.TopicPartition;
 
 /**
  * Represents the logic that manages the retrial of failed tuples.
  */
 public interface KafkaSpoutRetryService extends Serializable {
     /**
-     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or updates retry time if it has already been scheduled.
+     * Schedules this {@link KafkaSpoutMessageId} if not yet scheduled, or
+     * updates retry time if it has already been scheduled. It may also indicate
+     * that the message should not be retried, in which case the message will not be scheduled.
      * @param msgId message to schedule for retrial
+     * @return true if the message will be retried, false otherwise
      */
-    void schedule(KafkaSpoutMessageId msgId);
+    boolean schedule(KafkaSpoutMessageId msgId);
 
     /**
      * Removes a message from the list of messages scheduled for retrial
      * @param msgId message to remove from retrial
+     * @return true if the message was scheduled for retrial, false otherwise
      */
     boolean remove(KafkaSpoutMessageId msgId);
 
@@ -50,14 +55,16 @@ public interface KafkaSpoutRetryService extends Serializable {
     boolean retainAll(Collection<TopicPartition> topicPartitions);
 
     /**
-     * @return set of topic partitions that have offsets that are ready to be retried, i.e.,
-     * for which a tuple has failed and has retry time less than current time
+     * @return The earliest retriable offset for each TopicPartition that has
+     * offsets ready to be retried, i.e. for which a tuple has failed
+     * and has retry time less than current time
      */
-    Set<TopicPartition> retriableTopicPartitions();
+    Map<TopicPartition, Long> earliestRetriableOffsets();
 
     /**
-     * Checks if a specific failed {@link KafkaSpoutMessageId} is is ready to be retried,
+     * Checks if a specific failed {@link KafkaSpoutMessageId} is ready to be retried,
      * i.e is scheduled and has retry time that is less than current time.
+     * @param msgId message to check for readiness
      * @return true if message is ready to be retried, false otherwise
      */
     boolean isReady(KafkaSpoutMessageId msgId);
@@ -65,8 +72,23 @@ public interface KafkaSpoutRetryService extends Serializable {
     /**
      * Checks if a specific failed {@link KafkaSpoutMessageId} is scheduled to be retried.
      * The message may or may not be ready to be retried yet.
+     * @param msgId message to check for scheduling status
      * @return true if the message is scheduled to be retried, regardless of being or not ready to be retried.
      * Returns false is this message is not scheduled for retrial
      */
     boolean isScheduled(KafkaSpoutMessageId msgId);
+
+    /**
+     * @return The number of messages that are ready for retry
+     */
+    int readyMessageCount();
+
+    /**
+     * Gets the {@link KafkaSpoutMessageId} for the given record.
+     *
+     * @param record The record to fetch the id for
+     * @return The id the record was scheduled for retry with, or a new {@link KafkaSpoutMessageId} if the record was not scheduled for
+     * retry.
+     */
+    KafkaSpoutMessageId getMessageId(ConsumerRecord<?, ?> record);
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
deleted file mode 100644
index 5375f6c..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * Represents the stream and output fields used by a topic
- */
-public class KafkaSpoutStream implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class);
-
-    private final Fields outputFields;
-    private final String streamId;
-    private final String topic;
-    private Pattern topicWildcardPattern;
-
-    /** Represents the specified outputFields and topic with the default stream */
-    public KafkaSpoutStream(Fields outputFields, String topic) {
-        this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
-    }
-
-    /** Represents the specified outputFields and topic with the specified stream */
-    public KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
-        if (outputFields == null || streamId == null || topic == null) {
-            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
-                    "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
-        }
-        this.outputFields = outputFields;
-        this.streamId = streamId;
-        this.topic = topic;
-        this.topicWildcardPattern = null;
-    }
-
-    /** Represents the specified outputFields and topic wild card with the default stream */
-    KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) {
-        this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern);
-    }
-
-    /** Represents the specified outputFields and topic wild card with the specified stream */
-    public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) {
-
-        if (outputFields == null || streamId == null || topicWildcardPattern == null) {
-            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
-                    "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern));
-        }
-        this.outputFields = outputFields;
-        this.streamId = streamId;
-        this.topic = null;
-        this.topicWildcardPattern = topicWildcardPattern;
-    }
-
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
-        collector.emit(streamId, tuple, messageId);
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic);
-        declarer.declareStream(streamId, outputFields);
-    }
-
-
-    public Fields getOutputFields() {
-        return outputFields;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    /**
-     * @return the topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a wildcard pattern topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a specific named topic
-     */
-    public Pattern getTopicWildcardPattern() {
-        return topicWildcardPattern;
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStream{" +
-                "outputFields=" + outputFields +
-                ", streamId='" + streamId + '\'' +
-                ", topic='" + topic + '\'' +
-                ", topicWildcardPattern=" + topicWildcardPattern +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
deleted file mode 100644
index 6910d3c..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides
- * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
- */
-public interface KafkaSpoutStreams extends Serializable {
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-
-    void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId);
-}