You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/01 15:56:27 UTC

[4/8] storm git commit: STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams STORM-2236: Reimplemented manual partit

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
index 737c810..e0a3451 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpout.java
@@ -18,21 +18,11 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.clients.consumer.ConsumerRecords;
-import org.apache.kafka.clients.consumer.KafkaConsumer;
-import org.apache.kafka.clients.consumer.OffsetAndMetadata;
-import org.apache.kafka.common.TopicPartition;
-import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseRichSpout;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
@@ -46,18 +36,28 @@ import java.util.NavigableSet;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.TimeUnit;
-import java.util.regex.Pattern;
 
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.clients.consumer.OffsetAndMetadata;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.errors.InterruptException;
+import org.apache.kafka.common.errors.RetriableException;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactoryDefault;
-
-import org.apache.kafka.common.errors.InterruptException;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class KafkaSpout<K, V> extends BaseRichSpout {
+    private static final long serialVersionUID = 4151921085047987154L;
     private static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
     private static final Comparator<KafkaSpoutMessageId> OFFSET_COMPARATOR = new OffsetComparator();
 
@@ -79,13 +79,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private transient boolean initialized;                              // Flag indicating that the spout is still undergoing initialization process.
     // Initialization is only complete after the first call to  KafkaSpoutConsumerRebalanceListener.onPartitionsAssigned()
 
-    private KafkaSpoutStreams kafkaSpoutStreams;                        // Object that wraps all the logic to declare output fields and emit tuples
-    private transient KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;      // Object that contains the logic to build tuples for each ConsumerRecord
-
     transient Map<TopicPartition, OffsetEntry> acked;           // Tuples that were successfully acked. These tuples will be committed periodically when the commit timer expires, after consumer rebalance, or on close/deactivate
     private transient Set<KafkaSpoutMessageId> emitted;                 // Tuples that have been emitted but that are "on the wire", i.e. pending being acked or failed
     private transient Iterator<ConsumerRecord<K, V>> waitingToEmit;         // Records that have been polled and are queued to be emitted in the nextTuple() call. One record is emitted per nextTuple()
     private transient long numUncommittedOffsets;                       // Number of offsets that have been polled and emitted but not yet been committed
+    private transient TopologyContext context;
+    private transient Timer refreshSubscriptionTimer;                   // Used to say when a subscription should be refreshed
 
 
     public KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
@@ -95,13 +94,13 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     //This constructor is here for testing
     KafkaSpout(KafkaSpoutConfig<K, V> kafkaSpoutConfig, KafkaConsumerFactory<K, V> kafkaConsumerFactory) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;                 // Pass in configuration
-        this.kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
         this.kafkaConsumerFactory = kafkaConsumerFactory;
     }
 
     @Override
     public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
         initialized = false;
+        this.context = context;
 
         // Spout internals
         this.collector = collector;
@@ -115,12 +114,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         // Retries management
         retryService = kafkaSpoutConfig.getRetryService();
 
-        // Tuples builder delegate
-        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
-
         if (!consumerAutoCommitMode) {     // If it is auto commit, no need to commit offsets manually
             commitTimer = new Timer(500, kafkaSpoutConfig.getOffsetsCommitPeriodMs(), TimeUnit.MILLISECONDS);
         }
+        refreshSubscriptionTimer = new Timer(500, kafkaSpoutConfig.getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
         acked = new HashMap<>();
         emitted = new HashSet<>();
@@ -183,10 +180,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             long fetchOffset;
             if (committedOffset != null) {             // offset was committed for this TopicPartition
                 if (firstPollOffsetStrategy.equals(EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(toArrayList(tp));
+                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
                     fetchOffset = kafkaConsumer.position(tp);
                 } else if (firstPollOffsetStrategy.equals(LATEST)) {
-                    kafkaConsumer.seekToEnd(toArrayList(tp));
+                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
                     fetchOffset = kafkaConsumer.position(tp);
                 } else {
                     // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
@@ -195,9 +192,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 }
             } else {    // no commits have ever been done, so start at the beginning or end depending on the strategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(toArrayList(tp));
+                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
                 } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
-                    kafkaConsumer.seekToEnd(toArrayList(tp));
+                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
                 }
                 fetchOffset = kafkaConsumer.position(tp);
             }
@@ -205,10 +202,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-    private Collection<TopicPartition> toArrayList(final TopicPartition tp) {
-        return new ArrayList<TopicPartition>(1){{add(tp);}};
-    }
-
     private void setAcked(TopicPartition tp, long fetchOffset) {
         // If this partition was previously assigned to this spout, leave the acked offsets as they were to resume where it left off
         if (!consumerAutoCommitMode && !acked.containsKey(tp)) {
@@ -227,7 +220,11 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                 }
 
                 if (poll()) {
-                    setWaitingToEmit(pollKafkaBroker());
+                    try {
+                        setWaitingToEmit(pollKafkaBroker());
+                    } catch (RetriableException e) {
+                        LOG.error("Failed to poll from kafka.", e);
+                    }
                 }
 
                 if (waitingToEmit()) {
@@ -282,7 +279,9 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     // ======== poll =========
     private ConsumerRecords<K, V> pollKafkaBroker() {
         doSeekRetriableTopicPartitions();
-
+        if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+            kafkaSpoutConfig.getSubscription().refreshAssignment();
+        }
         final ConsumerRecords<K, V> consumerRecords = kafkaConsumer.poll(kafkaSpoutConfig.getPollTimeoutMs());
         final int numPolledRecords = consumerRecords.count();
         LOG.debug("Polled [{}] records from Kafka. [{}] uncommitted offsets across all topic partitions", numPolledRecords, numUncommittedOffsets);
@@ -309,7 +308,6 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
     }
 
-
     //Emits one tuple per record
     //@return true if tuple was emitted
     private boolean emitTupleIfNotEmitted(ConsumerRecord<K, V> record) {
@@ -323,8 +321,12 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         } else {
             boolean isScheduled = retryService.isScheduled(msgId);
             if (!isScheduled || retryService.isReady(msgId)) {   // not scheduled <=> never failed (i.e. never emitted) or ready to be retried
-                final List<Object> tuple = tuplesBuilder.buildTuple(record);
-                kafkaSpoutStreams.emit(collector, tuple, msgId);
+                final List<Object> tuple = kafkaSpoutConfig.getTranslator().apply(record);
+                if (tuple instanceof KafkaTuple) {
+                    collector.emit(((KafkaTuple)tuple).getStream(), tuple, msgId);
+                } else {
+                    collector.emit(tuple, msgId);
+                }
                 emitted.add(msgId);
                 numUncommittedOffsets++;
                 if (isScheduled) { // Was scheduled for retry, now being re-emitted. Remove from schedule.
@@ -411,18 +413,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
     private void subscribeKafkaConsumer() {
         kafkaConsumer = kafkaConsumerFactory.createConsumer(kafkaSpoutConfig);
 
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            final List<String> topics = ((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics();
-            kafkaConsumer.subscribe(topics, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics {}", topics);
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            final Pattern pattern = ((KafkaSpoutStreamsWildcardTopics) kafkaSpoutStreams).getTopicWildcardPattern();
-            kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-        }
-        // Initial poll to get the consumer registration process going.
-        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
-        kafkaConsumer.poll(0);
+        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
     }
 
     @Override
@@ -456,7 +447,10 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
 
     @Override
     public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        kafkaSpoutStreams.declareOutputFields(declarer);
+        RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+        for (String stream: translator.streams()) {
+            declarer.declareStream(stream, translator.getFieldsFor(stream));
+        }
     }
 
     @Override
@@ -475,11 +469,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         }
         String configKeyPrefix = "config.";
 
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            configuration.put(configKeyPrefix + "topics", getNamedTopics());
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            configuration.put(configKeyPrefix + "topics", getWildCardTopics());
-        }
+        configuration.put(configKeyPrefix + "topics", getTopicsString());
 
         configuration.put(configKeyPrefix + "groupid", kafkaSpoutConfig.getConsumerGroupId());
         configuration.put(configKeyPrefix + "bootstrap.servers", kafkaSpoutConfig.getKafkaProps().get("bootstrap.servers"));
@@ -487,16 +477,8 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
         return configuration;
     }
 
-    private String getNamedTopics() {
-        StringBuilder topics = new StringBuilder();
-        for (String topic: kafkaSpoutConfig.getSubscribedTopics()) {
-            topics.append(topic).append(",");
-        }
-        return topics.toString();
-    }
-
-    private String getWildCardTopics() {
-        return kafkaSpoutConfig.getTopicWildcardPattern().toString();
+    private String getTopicsString() {
+        return kafkaSpoutConfig.getSubscription().getTopicsString();
     }
 
     // ======= Offsets Commit Management ==========
@@ -600,7 +582,7 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
             return ackedMsgs.isEmpty();
         }
 
-        public boolean contains(ConsumerRecord record) {
+        public boolean contains(ConsumerRecord<K, V> record) {
             return contains(new KafkaSpoutMessageId(record));
         }
 
@@ -618,60 +600,4 @@ public class KafkaSpout<K, V> extends BaseRichSpout {
                     '}';
         }
     }
-
-    // =========== Timer ===========
-
-    private class Timer {
-        private final long delay;
-        private final long period;
-        private final TimeUnit timeUnit;
-        private final long periodNanos;
-        private long start;
-
-        /**
-         * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
-         * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
-         * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
-         *
-         * @param delay    the initial delay before the timer starts
-         * @param period   the period between calls {@link #isExpiredResetOnTrue()}
-         * @param timeUnit the time unit of delay and period
-         */
-        public Timer(long delay, long period, TimeUnit timeUnit) {
-            this.delay = delay;
-            this.period = period;
-            this.timeUnit = timeUnit;
-
-            periodNanos = timeUnit.toNanos(period);
-            start = System.nanoTime() + timeUnit.toNanos(delay);
-        }
-
-        public long period() {
-            return period;
-        }
-
-        public long delay() {
-            return delay;
-        }
-
-        public TimeUnit getTimeUnit() {
-            return timeUnit;
-        }
-
-        /**
-         * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
-         * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
-         * (re-initiated) and a new cycle will start.
-         *
-         * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
-         * otherwise.
-         */
-        public boolean isExpiredResetOnTrue() {
-            final boolean expired = System.nanoTime() - start > periodNanos;
-            if (expired) {
-                start = System.nanoTime();
-            }
-            return expired;
-        }
-    }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
index 8aa525b..db07fda 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutConfig.java
@@ -18,34 +18,40 @@
 
 package org.apache.storm.kafka.spout;
 
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.kafka.common.serialization.Deserializer;
-import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
-
 import java.io.Serializable;
-import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Properties;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.serialization.Deserializer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryExponentialBackoff.TimeInterval;
+import org.apache.storm.tuple.Fields;
+
 /**
  * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
  */
 public class KafkaSpoutConfig<K, V> implements Serializable {
+    private static final long serialVersionUID = 141902646130682494L;
     public static final long DEFAULT_POLL_TIMEOUT_MS = 200;            // 200ms
     public static final long DEFAULT_OFFSET_COMMIT_PERIOD_MS = 30_000;   // 30s
     public static final int DEFAULT_MAX_RETRIES = Integer.MAX_VALUE;     // Retry forever
     public static final int DEFAULT_MAX_UNCOMMITTED_OFFSETS = 10_000_000;    // 10,000,000 records => 80MBs of memory footprint in the worst case
-
-    // Kafka property names
-    public interface Consumer {
-        String GROUP_ID = "group.id";
-        String BOOTSTRAP_SERVERS = "bootstrap.servers";
-        String ENABLE_AUTO_COMMIT = "enable.auto.commit";
-        String AUTO_COMMIT_INTERVAL_MS = "auto.commit.interval.ms";
-        String KEY_DESERIALIZER = "key.deserializer";
-        String VALUE_DESERIALIZER = "value.deserializer";
-    }
+    public static final long DEFAULT_PARTITION_REFRESH_PERIOD_MS = 2_000; // 2s
+    public static final KafkaSpoutRetryService DEFAULT_RETRY_SERVICE =  
+            new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
+                    DEFAULT_MAX_RETRIES, TimeInterval.seconds(10));
+    /**
+     * Retry in a tight loop (keep unit tests fasts) do not use in production.
+     */
+    public static final KafkaSpoutRetryService UNIT_TEST_RETRY_SERVICE = 
+    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(0),
+            DEFAULT_MAX_RETRIES, TimeInterval.milliSeconds(0));
 
     /**
      * The offset used by the Kafka spout in the first poll to Kafka broker. The choice of this parameter will
@@ -60,123 +66,254 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
      * If no offset has been committed, it behaves as LATEST.</li>
      * </ul>
      * */
-    public enum FirstPollOffsetStrategy {
+    public static enum FirstPollOffsetStrategy {
         EARLIEST,
         LATEST,
         UNCOMMITTED_EARLIEST,
         UNCOMMITTED_LATEST }
-
-    // Kafka consumer configuration
-    private final Map<String, Object> kafkaProps;
-    private final Deserializer<K> keyDeserializer;
-    private final Deserializer<V> valueDeserializer;
-    private final long pollTimeoutMs;
-
-    // Kafka spout configuration
-    private final long offsetCommitPeriodMs;
-    private final int maxRetries;
-    private final int maxUncommittedOffsets;
-    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
-    private final KafkaSpoutStreams kafkaSpoutStreams;
-    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
-    private final KafkaSpoutRetryService retryService;
-
-    private KafkaSpoutConfig(Builder<K,V> builder) {
-        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
-        this.keyDeserializer = builder.keyDeserializer;
-        this.valueDeserializer = builder.valueDeserializer;
-        this.pollTimeoutMs = builder.pollTimeoutMs;
-        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
-        this.maxRetries = builder.maxRetries;
-        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
-        this.kafkaSpoutStreams = builder.kafkaSpoutStreams;
-        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
-        this.tuplesBuilder = builder.tuplesBuilder;
-        this.retryService = builder.retryService;
+    
+    public static Builder<String, String> builder(String bootstrapServers, String ... topics) {
+        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
     }
-
-    private Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
+    
+    public static Builder<String, String> builder(String bootstrapServers, Collection<String> topics) {
+        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
+    }
+    
+    public static Builder<String, String> builder(String bootstrapServers, Pattern topics) {
+        return new Builder<>(bootstrapServers, StringDeserializer.class, StringDeserializer.class, topics);
+    }
+    
+    private static Map<String, Object> setDefaultsAndGetKafkaProps(Map<String, Object> kafkaProps) {
         // set defaults for properties not specified
-        if (!kafkaProps.containsKey(Consumer.ENABLE_AUTO_COMMIT)) {
-            kafkaProps.put(Consumer.ENABLE_AUTO_COMMIT, "false");
+        if (!kafkaProps.containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
+            kafkaProps.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
         }
         return kafkaProps;
     }
-
+    
     public static class Builder<K,V> {
         private final Map<String, Object> kafkaProps;
-        private SerializableDeserializer<K> keyDeserializer;
-        private SerializableDeserializer<V> valueDeserializer;
+        private Subscription subscription;
+        private final SerializableDeserializer<K> keyDes;
+        private final Class<? extends Deserializer<K>> keyDesClazz;
+        private final SerializableDeserializer<V> valueDes;
+        private final Class<? extends Deserializer<V>> valueDesClazz;
+        private RecordTranslator<K, V> translator;
         private long pollTimeoutMs = DEFAULT_POLL_TIMEOUT_MS;
         private long offsetCommitPeriodMs = DEFAULT_OFFSET_COMMIT_PERIOD_MS;
         private int maxRetries = DEFAULT_MAX_RETRIES;
         private FirstPollOffsetStrategy firstPollOffsetStrategy = FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-        private final KafkaSpoutStreams kafkaSpoutStreams;
         private int maxUncommittedOffsets = DEFAULT_MAX_UNCOMMITTED_OFFSETS;
-        private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
-        private final KafkaSpoutRetryService retryService;
+        private KafkaSpoutRetryService retryService = DEFAULT_RETRY_SERVICE;
+        private long partitionRefreshPeriodMs = DEFAULT_PARTITION_REFRESH_PERIOD_MS;
+        
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, String ... topics) {
+            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Collection<String> topics) {
+            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Pattern topics) {
+            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, SerializableDeserializer<V> valDes, Subscription subscription) {
+            this(bootstrapServers, keyDes, null, valDes, null, subscription);
+        }
+        
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, String ... topics) {
+            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Collection<String> topics) {
+            this(bootstrapServers, keyDes, valDes, new NamedSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Pattern topics) {
+            this(bootstrapServers, keyDes, valDes, new PatternSubscription(topics));
+        }
+        
+        public Builder(String bootstrapServers, Class<? extends Deserializer<K>> keyDes, Class<? extends Deserializer<V>> valDes, Subscription subscription) {
+            this(bootstrapServers, null, keyDes, null, valDes, subscription);
+        }
+        
+        private Builder(String bootstrapServers, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                SerializableDeserializer<V> valDes, Class<? extends Deserializer<V>> valDesClazz, Subscription subscription) {
+            kafkaProps = new HashMap<>();
+            if (bootstrapServers == null || bootstrapServers.isEmpty()) {
+                throw new IllegalArgumentException("bootstrap servers cannot be null");
+            }
+            kafkaProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+            this.keyDes = keyDes;
+            this.keyDesClazz = keyDesClazz;
+            this.valueDes = valDes;
+            this.valueDesClazz = valDesClazz;
+            this.subscription = subscription;
+            this.translator = new DefaultRecordTranslator<K,V>();
+        }
+
+        private Builder(Builder<?, ?> builder, SerializableDeserializer<K> keyDes, Class<? extends Deserializer<K>> keyDesClazz,
+                SerializableDeserializer<V> valueDes, Class<? extends Deserializer<V>> valueDesClazz) {
+            this.kafkaProps = new HashMap<>(builder.kafkaProps);
+            this.subscription = builder.subscription;
+            this.pollTimeoutMs = builder.pollTimeoutMs;
+            this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+            this.maxRetries = builder.maxRetries;
+            this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+            this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+            //this could result in a lot of class case exceptions at runtime,
+            // but because some translators will work no matter what the generics
+            // are I thought it best not to force someone to reset the translator
+            // when they change the key/value types.
+            this.translator = (RecordTranslator<K, V>) builder.translator;
+            this.retryService = builder.retryService;
+            this.keyDes = keyDes;
+            this.keyDesClazz = keyDesClazz;
+            this.valueDes = valueDes;
+            this.valueDesClazz = valueDesClazz;
+        }
 
         /**
-         * Please refer to javadoc in {@link #Builder(Map, KafkaSpoutStreams, KafkaSpoutTuplesBuilder, KafkaSpoutRetryService)}.<p/>
-         * This constructor uses by the default the following implementation for {@link KafkaSpoutRetryService}:<p/>
-         * {@code new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-         *           DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)))}
+         * Specifying this key deserializer overrides the property key.deserializer. If you have
+         * set a custom RecordTranslator before calling this it may result in class cast
+         * exceptions at runtime.
          */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder) {
-            this(kafkaProps, kafkaSpoutStreams, tuplesBuilder,
-                    new KafkaSpoutRetryExponentialBackoff(TimeInterval.seconds(0), TimeInterval.milliSeconds(2),
-                            DEFAULT_MAX_RETRIES, TimeInterval.seconds(10)));
+        public <NK> Builder<NK,V> setKey(SerializableDeserializer<NK> keyDeserializer) {
+            return new Builder<>(this, keyDeserializer, null, valueDes, valueDesClazz);
         }
-
-        /***
-         * KafkaSpoutConfig defines the required configuration to connect a consumer to a consumer group, as well as the subscribing topics
-         * The optional configuration can be specified using the set methods of this builder
-         * @param kafkaProps    properties defining consumer connection to Kafka broker as specified in @see <a href="http://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html">KafkaConsumer</a>
-         * @param kafkaSpoutStreams    streams to where the tuples are emitted for each tuple. Multiple topics can emit in the same stream.
-         * @param tuplesBuilder logic to build tuples from {@link ConsumerRecord}s.
-         * @param retryService  logic that manages the retrial of failed tuples
+        
+        /**
+         * Specify a class that can be instantiated to create a key.deserializer
+         * This is the same as setting key.deserializer, but overrides it. If you have
+         * set a custom RecordTranslator before calling this it may result in class cast
+         * exceptions at runtime.
          */
-        public Builder(Map<String, Object> kafkaProps, KafkaSpoutStreams kafkaSpoutStreams,
-                       KafkaSpoutTuplesBuilder<K,V> tuplesBuilder, KafkaSpoutRetryService retryService) {
-            if (kafkaProps == null || kafkaProps.isEmpty()) {
-                throw new IllegalArgumentException("Properties defining consumer connection to Kafka broker are required: " + kafkaProps);
-            }
-
-            if (kafkaSpoutStreams == null)  {
-                throw new IllegalArgumentException("Must specify stream associated with each topic. Multiple topics can emit to the same stream");
-            }
-
-            if (tuplesBuilder == null) {
-                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
-            }
-
-            if (retryService == null) {
-                throw new IllegalArgumentException("Must specify at implementation of retry service");
-            }
-
-            this.kafkaProps = kafkaProps;
-            this.kafkaSpoutStreams = kafkaSpoutStreams;
-            this.tuplesBuilder = tuplesBuilder;
-            this.retryService = retryService;
+        public <NK> Builder<NK, V> setKey(Class<? extends Deserializer<NK>> clazz) {
+            return new Builder<>(this, null, clazz, valueDes, valueDesClazz);
         }
 
         /**
-         * Specifying this key deserializer overrides the property key.deserializer
+         * Specifying this value deserializer overrides the property value.deserializer.  If you have
+         * set a custom RecordTranslator before calling this it may result in class cast
+         * exceptions at runtime.
          */
-        public Builder<K,V> setKeyDeserializer(SerializableDeserializer<K> keyDeserializer) {
-            this.keyDeserializer = keyDeserializer;
+        public <NV> Builder<K,NV> setValue(SerializableDeserializer<NV> valueDeserializer) {
+            return new Builder<>(this, keyDes, keyDesClazz, valueDeserializer, null);
+        }
+        
+        /**
+         * Specify a class that can be instantiated to create a value.deserializer
+         * This is the same as setting value.deserializer, but overrides it.  If you have
+         * set a custom RecordTranslator before calling this it may result in class cast
+         * exceptions at runtime.
+         */
+        public <NV> Builder<K,NV> setValue(Class<? extends Deserializer<NV>> clazz) {
+            return new Builder<>(this, keyDes, keyDesClazz, null, clazz);
+        }
+        
+        /**
+         * Set a Kafka property config
+         */
+        public Builder<K,V> setProp(String key, Object value) {
+            kafkaProps.put(key, value);
             return this;
         }
-
+        
         /**
-         * Specifying this value deserializer overrides the property value.deserializer
+         * Set multiple Kafka property configs
          */
-        public Builder<K,V> setValueDeserializer(SerializableDeserializer<V> valueDeserializer) {
-            this.valueDeserializer = valueDeserializer;
+        public Builder<K,V> setProp(Map<String, Object> props) {
+            kafkaProps.putAll(props);
             return this;
         }
+        
+        /**
+         * Set multiple Kafka property configs
+         */
+        public Builder<K,V> setProp(Properties props) {
+            for (String name: props.stringPropertyNames()) {
+                kafkaProps.put(name, props.get(name));
+            }
+            return this;
+        }
+        
+        /**
+         * Set the group.id for the consumers
+         */
+        public Builder<K,V> setGroupId(String id) {
+            return setProp("group.id", id);
+        }
+        
+        /**
+         * reset the bootstrap servers for the Consumer
+         */
+        public Builder<K,V> setBootstrapServers(String servers) {
+            return setProp(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, servers);
+        }
+        
+        /**
+         * The minimum amount of data the broker should return for a fetch request.
+         */
+        public Builder<K,V> setFetchMinBytes(int bytes) {
+            return setProp(ConsumerConfig.FETCH_MIN_BYTES_CONFIG, bytes);
+        }
+        
+        /**
+         * The maximum amount of data per-partition the broker will return.
+         */
+        public Builder<K,V> setMaxPartitionFectchBytes(int bytes) {
+            return setProp(ConsumerConfig.MAX_PARTITION_FETCH_BYTES_CONFIG, bytes);
+        }
+        
+        /**
+         * The maximum number of records a poll will return.
+         * Will only work with Kafka 0.10.0 and above.
+         */
+        public Builder<K,V> setMaxPollRecords(int records) {
+            //to avoid issues with 0.9 versions that technically still work
+            // with this we do not use ConsumerConfig.MAX_POLL_RECORDS_CONFIG
+            return setProp("max.poll.records", records);
+        }
+        
+        //Security Related Configs
+        
+        /**
+         * Configure the SSL Keystore for mutual authentication
+         */
+        public Builder<K,V> setSSLKeystore(String location, String password) {
+            return setProp("ssl.keystore.location", location)
+                    .setProp("ssl.keystore.password", password);
+        }
+       
+        /**
+         * Configure the SSL Keystore for mutual authentication
+         */
+        public Builder<K,V> setSSLKeystore(String location, String password, String keyPassword) {
+            return setProp("ssl.key.password", keyPassword)
+                    .setSSLKeystore(location, password);
+        }
+        
+        /**
+         * Configure the SSL Truststore to authenticate with the brokers
+         */
+        public Builder<K,V> setSSLTruststore(String location, String password) {
+            return setSecurityProtocol("SSL")
+                    .setProp("ssl.truststore.location", location)
+                    .setProp("ssl.truststore.password", password);
+        }
+        
+        /**
+         * Protocol used to communicate with brokers. 
+         * Valid values are: PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL.
+         */
+        public Builder<K, V> setSecurityProtocol(String protocol) {
+            return setProp("security.protocol", protocol);
+        }
 
+        //Spout Settings
         /**
          * Specifies the time, in milliseconds, spent waiting in poll if data is not available. Default is 2s
          * @param pollTimeoutMs time in ms
@@ -228,22 +365,131 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
             this.firstPollOffsetStrategy = firstPollOffsetStrategy;
             return this;
         }
+        
+        /**
+         * Sets the retry service for the spout to use.
+         * @param retryService the new retry service
+         * @return the builder (this).
+         */
+        public Builder<K, V> setRetry(KafkaSpoutRetryService retryService) {
+            if (retryService == null) {
+                throw new NullPointerException("retryService cannot be null");
+            }
+            this.retryService = retryService;
+            return this;
+        }
 
+        public Builder<K, V> setRecordTranslator(RecordTranslator<K, V> translator) {
+            this.translator = translator;
+            return this;
+        }
+        
+        /**
+         * Configure a translator with tuples to be emitted on the default stream.
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @return this to be able to chain configuration
+         */
+        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields));
+        }
+        
+        /**
+         * Configure a translator with tuples to be emitted to a given stream.
+         * @param func extracts and turns a Kafka ConsumerRecord into a list of objects to be emitted
+         * @param fields the names of the fields extracted
+         * @param stream the stream to emit the tuples on
+         * @return this to be able to chain configuration
+         */
+        public Builder<K, V> setRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+            return setRecordTranslator(new SimpleRecordTranslator<>(func, fields, stream));
+        }
+        
+        /**
+         * Sets partition refresh period in milliseconds. This is how often kafka will be polled
+         * to check for new topics and/or new partitions.
+         * This is mostly for Subscription implementations that manually assign partitions. NamedSubscription and
+         * PatternSubscription rely on kafka to handle this instead.
+         * @param partitionRefreshPeriodMs time in milliseconds
+         * @return the builder (this)
+         */
+        public Builder<K, V> setPartitionRefreshPeriodMs(long partitionRefreshPeriodMs) {
+            this.partitionRefreshPeriodMs = partitionRefreshPeriodMs;
+            return this;
+        }
+        
         public KafkaSpoutConfig<K,V> build() {
             return new KafkaSpoutConfig<>(this);
         }
     }
 
+    // Kafka consumer configuration
+    private final Map<String, Object> kafkaProps;
+    private final Subscription subscription;
+    private final SerializableDeserializer<K> keyDes;
+    private final Class<? extends Deserializer<K>> keyDesClazz;
+    private final SerializableDeserializer<V> valueDes;
+    private final Class<? extends Deserializer<V>> valueDesClazz;
+    private final long pollTimeoutMs;
+
+    // Kafka spout configuration
+    private final RecordTranslator<K, V> translator;
+    private final long offsetCommitPeriodMs;
+    private final int maxRetries;
+    private final int maxUncommittedOffsets;
+    private final FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final KafkaSpoutRetryService retryService;
+    private final long partitionRefreshPeriodMs;
+
+    private KafkaSpoutConfig(Builder<K,V> builder) {
+        this.kafkaProps = setDefaultsAndGetKafkaProps(builder.kafkaProps);
+        this.subscription = builder.subscription;
+        this.translator = builder.translator;
+        this.pollTimeoutMs = builder.pollTimeoutMs;
+        this.offsetCommitPeriodMs = builder.offsetCommitPeriodMs;
+        this.maxRetries = builder.maxRetries;
+        this.firstPollOffsetStrategy = builder.firstPollOffsetStrategy;
+        this.maxUncommittedOffsets = builder.maxUncommittedOffsets;
+        this.retryService = builder.retryService;
+        this.keyDes = builder.keyDes;
+        this.keyDesClazz = builder.keyDesClazz;
+        this.valueDes = builder.valueDes;
+        this.valueDesClazz = builder.valueDesClazz;
+        this.partitionRefreshPeriodMs = builder.partitionRefreshPeriodMs;
+    }
+
     public Map<String, Object> getKafkaProps() {
         return kafkaProps;
     }
 
     public Deserializer<K> getKeyDeserializer() {
-        return keyDeserializer;
+        if (keyDesClazz != null) {
+            try {
+                return keyDesClazz.newInstance();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new RuntimeException("Could not instantiate key deserializer " + keyDesClazz);
+            }
+        }
+        return keyDes;
     }
 
     public Deserializer<V> getValueDeserializer() {
-        return valueDeserializer;
+        if (valueDesClazz != null) {
+            try {
+                return valueDesClazz.newInstance();
+            } catch (InstantiationException | IllegalAccessException e) {
+                throw new RuntimeException("Could not instantiate value deserializer " + valueDesClazz);
+            }
+        }
+        return valueDes;
+    }
+    
+    public Subscription getSubscription() {
+        return subscription;
+    }
+    
+    public RecordTranslator<K,V> getTranslator() {
+        return translator;
     }
 
     public long getPollTimeoutMs() {
@@ -255,32 +501,12 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
     }
 
     public boolean isConsumerAutoCommitMode() {
-        return kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT) == null     // default is true
-                || Boolean.valueOf((String)kafkaProps.get(Consumer.ENABLE_AUTO_COMMIT));
+        return kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG) == null     // default is true
+                || Boolean.valueOf((String)kafkaProps.get(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG));
     }
 
     public String getConsumerGroupId() {
-        return (String) kafkaProps.get(Consumer.GROUP_ID);
-    }
-
-    /**
-     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream},
-     * or null if this stream is associated with a wildcard pattern topic
-     */
-    public List<String> getSubscribedTopics() {
-        return kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics ?
-            new ArrayList<>(((KafkaSpoutStreamsNamedTopics) kafkaSpoutStreams).getTopics()) :
-            null;
-    }
-
-    /**
-     * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a specific named topic
-     */
-    public Pattern getTopicWildcardPattern() {
-        return kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics ?
-                ((KafkaSpoutStreamsWildcardTopics)kafkaSpoutStreams).getTopicWildcardPattern() :
-                null;
+        return (String) kafkaProps.get(ConsumerConfig.GROUP_ID_CONFIG);
     }
 
     public int getMaxTupleRetries() {
@@ -291,38 +517,32 @@ public class KafkaSpoutConfig<K, V> implements Serializable {
         return firstPollOffsetStrategy;
     }
 
-    public KafkaSpoutStreams getKafkaSpoutStreams() {
-        return kafkaSpoutStreams;
-    }
-
     public int getMaxUncommittedOffsets() {
         return maxUncommittedOffsets;
     }
 
-    public KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
-        return tuplesBuilder;
-    }
-
     public KafkaSpoutRetryService getRetryService() {
         return retryService;
     }
+    
+    public long getPartitionRefreshPeriodMs() {
+        return partitionRefreshPeriodMs;
+    }
 
     @Override
     public String toString() {
         return "KafkaSpoutConfig{" +
                 "kafkaProps=" + kafkaProps +
-                ", keyDeserializer=" + keyDeserializer +
-                ", valueDeserializer=" + valueDeserializer +
+                ", key=" + getKeyDeserializer() +
+                ", value=" + getValueDeserializer() +
                 ", pollTimeoutMs=" + pollTimeoutMs +
                 ", offsetCommitPeriodMs=" + offsetCommitPeriodMs +
                 ", maxRetries=" + maxRetries +
                 ", maxUncommittedOffsets=" + maxUncommittedOffsets +
                 ", firstPollOffsetStrategy=" + firstPollOffsetStrategy +
-                ", kafkaSpoutStreams=" + kafkaSpoutStreams +
-                ", tuplesBuilder=" + tuplesBuilder +
+                ", subscription=" + subscription +
+                ", translator=" + translator +
                 ", retryService=" + retryService +
-                ", topics=" + getSubscribedTopics() +
-                ", topicWildcardPattern=" + getTopicWildcardPattern() +
                 '}';
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
index 71f8327..3cfad9d 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutMessageId.java
@@ -26,7 +26,7 @@ public class KafkaSpoutMessageId {
     private transient long offset;
     private transient int numFails = 0;
 
-    public KafkaSpoutMessageId(ConsumerRecord consumerRecord) {
+    public KafkaSpoutMessageId(ConsumerRecord<?, ?> consumerRecord) {
         this(new TopicPartition(consumerRecord.topic(), consumerRecord.partition()), consumerRecord.offset());
     }
 

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
deleted file mode 100644
index 0f444b4..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStream.java
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.io.Serializable;
-import java.util.List;
-import java.util.regex.Pattern;
-
-/**
- * Represents the stream and output fields used by a topic
- */
-public class KafkaSpoutStream implements Serializable {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStream.class);
-
-    private final Fields outputFields;
-    private final String streamId;
-    private final String topic;
-    private Pattern topicWildcardPattern;
-
-    /** Represents the specified outputFields and topic with the default stream */
-    public KafkaSpoutStream(Fields outputFields, String topic) {
-        this(outputFields, Utils.DEFAULT_STREAM_ID, topic);
-    }
-
-    /** Represents the specified outputFields and topic with the specified stream */
-    public KafkaSpoutStream(Fields outputFields, String streamId, String topic) {
-        if (outputFields == null || streamId == null || topic == null) {
-            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
-                    "[outputFields=%s, streamId=%s, topic=%s]", outputFields, streamId, topic));
-        }
-        this.outputFields = outputFields;
-        this.streamId = streamId;
-        this.topic = topic;
-        this.topicWildcardPattern = null;
-    }
-
-    /** Represents the specified outputFields and topic wild card with the default stream */
-    public KafkaSpoutStream(Fields outputFields, Pattern topicWildcardPattern) {
-        this(outputFields, Utils.DEFAULT_STREAM_ID, topicWildcardPattern);
-    }
-
-    /** Represents the specified outputFields and topic wild card with the specified stream */
-    public KafkaSpoutStream(Fields outputFields, String streamId, Pattern topicWildcardPattern) {
-
-        if (outputFields == null || streamId == null || topicWildcardPattern == null) {
-            throw new IllegalArgumentException(String.format("Constructor parameters cannot be null. " +
-                    "[outputFields=%s, streamId=%s, topicWildcardPattern=%s]", outputFields, streamId, topicWildcardPattern));
-        }
-        this.outputFields = outputFields;
-        this.streamId = streamId;
-        this.topic = null;
-        this.topicWildcardPattern = topicWildcardPattern;
-    }
-
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
-        collector.emit(streamId, tuple, messageId);
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        LOG.info("Declared [streamId = {}], [outputFields = {}] for [topic = {}]", streamId, outputFields, topic);
-        declarer.declareStream(streamId, outputFields);
-    }
-
-
-    public Fields getOutputFields() {
-        return outputFields;
-    }
-
-    public String getStreamId() {
-        return streamId;
-    }
-
-    /**
-     * @return the topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a wildcard pattern topic
-     */
-    public String getTopic() {
-        return topic;
-    }
-
-    /**
-     * @return the wildcard pattern topic associated with this {@link KafkaSpoutStream}, or null
-     * if this stream is associated with a specific named topic
-     */
-    public Pattern getTopicWildcardPattern() {
-        return topicWildcardPattern;
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStream{" +
-                "outputFields=" + outputFields +
-                ", streamId='" + streamId + '\'' +
-                ", topic='" + topic + '\'' +
-                ", topicWildcardPattern=" + topicWildcardPattern +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
deleted file mode 100644
index d4178a9..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreams.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * Represents the {@link KafkaSpoutStream} associated with each topic or topic pattern (wildcard), and provides
- * a public API to declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
- */
-public interface KafkaSpoutStreams extends Serializable {
-    void declareOutputFields(OutputFieldsDeclarer declarer);
-
-    void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId);
-
-    Fields getOutputFields();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
deleted file mode 100644
index bc2426a..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopics.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedHashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.OutputFieldsGetter;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.utils.Utils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Represents the {@link KafkaSpoutStream} associated with each topic, and provides a public API to
- * declare output streams and emmit tuples, on the appropriate stream, for all the topics specified.
- */
-public class KafkaSpoutStreamsNamedTopics implements KafkaSpoutStreams {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutStreamsNamedTopics.class);
-
-    private final Map<String, KafkaSpoutStream> topicToStream;
-
-    private KafkaSpoutStreamsNamedTopics(Builder builder) {
-        this.topicToStream = builder.topicToStream;
-        LOG.debug("Built {}", this);
-    }
-
-    /**
-     * @param topic the topic for which to get output fields
-     * @return the declared output fields
-     */
-    public Fields getOutputFields(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            final Fields outputFields = topicToStream.get(topic).getOutputFields();
-            LOG.trace("Topic [{}] has output fields [{}]", topic, outputFields);
-            return outputFields;
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        final Set<String> allFields = new LinkedHashSet<>();
-        for (KafkaSpoutStream kafkaSpoutStream : topicToStream.values()) {
-            allFields.addAll(kafkaSpoutStream.getOutputFields().toList());
-        }
-        return new Fields(new ArrayList<>(allFields));
-    }
-
-    /**
-     * @param topic the topic to for which to get the stream id
-     * @return the id of the stream to where the tuples are emitted
-     */
-    public KafkaSpoutStream getStream(String topic) {
-        if (topicToStream.containsKey(topic)) {
-            return topicToStream.get(topic);
-        }
-        throw new IllegalStateException(this.getClass().getName() + " not configured for topic: " + topic);
-    }
-
-    /**
-     * @return list of topics subscribed and emitting tuples to a stream as configured by {@link KafkaSpoutStream}
-     */
-    public List<String> getTopics() {
-        return new ArrayList<>(topicToStream.keySet());
-    }
-
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        for (KafkaSpoutStream stream : topicToStream.values()) {
-            if (!((OutputFieldsGetter)declarer).getFieldsDeclaration().containsKey(stream.getStreamId())) {
-                stream.declareOutputFields(declarer);
-            }
-        }
-    }
-
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
-        getStream(messageId.topic()).emit(collector, tuple, messageId);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStreamsNamedTopics{" +
-                "topicToStream=" + topicToStream +
-                '}';
-    }
-
-    public static class Builder {
-        private final Map<String, KafkaSpoutStream> topicToStream = new HashMap<>();;
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with the given output Fields for each topic specified.
-         * All topics will have the default stream id and the same output fields.
-         */
-        public Builder(Fields outputFields, String... topics) {
-            addStream(outputFields, topics);
-        }
-
-        /**
-         * Creates a {@link KafkaSpoutStream} with this particular stream for each topic specified.
-         * All the topics will have the specified stream id and the same output fields.
-         */
-        public Builder (Fields outputFields, String streamId, String... topics) {
-            addStream(outputFields, streamId, topics);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated with each topic
-         */
-        public Builder(KafkaSpoutStream stream) {
-            addStream(stream);
-        }
-
-        /**
-         * Adds this stream to the state representing the streams associated with each topic
-         */
-        public Builder addStream(KafkaSpoutStream stream) {
-            topicToStream.put(stream.getTopic(), stream);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String...)}
-         */
-        public Builder addStream(Fields outputFields, String... topics) {
-            addStream(outputFields, Utils.DEFAULT_STREAM_ID, topics);
-            return this;
-        }
-
-        /**
-         * Please refer to javadoc in {@link #Builder(Fields, String, String...)}
-         */
-        public Builder addStream(Fields outputFields, String streamId, String... topics) {
-            for (String topic : topics) {
-                topicToStream.put(topic, new KafkaSpoutStream(outputFields, streamId, topic));
-            }
-            return this;
-        }
-
-        public KafkaSpoutStreamsNamedTopics build() {
-            return new KafkaSpoutStreamsNamedTopics(this);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
deleted file mode 100644
index 64132b3..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsWildcardTopics.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.tuple.Fields;
-
-import java.util.List;
-import java.util.regex.Pattern;
-
-public class KafkaSpoutStreamsWildcardTopics implements KafkaSpoutStreams {
-    private KafkaSpoutStream kafkaSpoutStream;
-
-    public KafkaSpoutStreamsWildcardTopics(KafkaSpoutStream kafkaSpoutStream) {
-        this.kafkaSpoutStream = kafkaSpoutStream;
-        if (kafkaSpoutStream.getTopicWildcardPattern() == null) {
-            throw new IllegalStateException("KafkaSpoutStream must be configured for wildcard topic");
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-        kafkaSpoutStream.declareOutputFields(declarer);
-    }
-
-    @Override
-    public void emit(SpoutOutputCollector collector, List<Object> tuple, KafkaSpoutMessageId messageId) {
-        kafkaSpoutStream.emit(collector, tuple, messageId);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return kafkaSpoutStream.getOutputFields();
-    }
-
-    public KafkaSpoutStream getStream() {
-        return kafkaSpoutStream;
-    }
-
-    public Pattern getTopicWildcardPattern() {
-        return kafkaSpoutStream.getTopicWildcardPattern();
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutStreamsWildcardTopics{" +
-                "kafkaSpoutStream=" + kafkaSpoutStream +
-                '}';
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
deleted file mode 100644
index 3bb71a8..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTupleBuilder.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.io.Serializable;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-/**
- * Implementations of {@link KafkaSpoutTupleBuilder} contain the logic to build tuples from {@link ConsumerRecord}s.
- * Users must subclass this abstract class to provide their implementation. See also {@link KafkaSpoutTuplesBuilder}
- */
-public abstract class KafkaSpoutTupleBuilder<K,V> implements Serializable {
-    private List<String> topics;
-
-    /**
-     * @param topics list of topics that use this implementation to build tuples
-     */
-    public KafkaSpoutTupleBuilder(String... topics) {
-        if (topics == null || topics.length == 0) {
-            throw new IllegalArgumentException("Must specify at least one topic. It cannot be null or empty");
-        }
-        this.topics = Arrays.asList(topics);
-    }
-
-    /**
-     * @return list of topics that use this implementation to build tuples
-     */
-    public List<String> getTopics() {
-        return Collections.unmodifiableList(topics);
-    }
-
-    /**
-     * Builds a list of tuples using the ConsumerRecord specified as parameter
-     * @param consumerRecord whose contents are used to build tuples
-     * @return list of tuples
-     */
-    public abstract List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
deleted file mode 100644
index 2ba0a79..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilder.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.io.Serializable;
-import java.util.List;
-
-/**
- * {@link KafkaSpoutTuplesBuilder} wraps all the logic that builds tuples from {@link ConsumerRecord}s.
- * The logic is provided by the user by implementing the appropriate number of {@link KafkaSpoutTupleBuilder} instances
- */
-public interface KafkaSpoutTuplesBuilder<K,V> extends Serializable {
-    List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
deleted file mode 100644
index 80fe543..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderNamedTopics.java
+++ /dev/null
@@ -1,78 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class KafkaSpoutTuplesBuilderNamedTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> {
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaSpoutTuplesBuilderNamedTopics.class);
-
-    private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-    private KafkaSpoutTuplesBuilderNamedTopics(Builder<K,V> builder) {
-        this.topicToTupleBuilders = builder.topicToTupleBuilders;
-        LOG.debug("Instantiated {}", this);
-    }
-
-    public static class Builder<K,V> {
-        private List<KafkaSpoutTupleBuilder<K, V>> tupleBuilders;
-        private Map<String, KafkaSpoutTupleBuilder<K, V>> topicToTupleBuilders;
-
-        @SafeVarargs
-        public Builder(KafkaSpoutTupleBuilder<K,V>... tupleBuilders) {
-            if (tupleBuilders == null || tupleBuilders.length == 0) {
-                throw new IllegalArgumentException("Must specify at last one tuple builder per topic declared in KafkaSpoutStreams");
-            }
-
-            this.tupleBuilders = Arrays.asList(tupleBuilders);
-            topicToTupleBuilders = new HashMap<>();
-        }
-
-        public KafkaSpoutTuplesBuilderNamedTopics<K,V> build() {
-            for (KafkaSpoutTupleBuilder<K, V> tupleBuilder : tupleBuilders) {
-                for (String topic : tupleBuilder.getTopics()) {
-                    if (!topicToTupleBuilders.containsKey(topic)) {
-                        topicToTupleBuilders.put(topic, tupleBuilder);
-                    }
-                }
-            }
-            return new KafkaSpoutTuplesBuilderNamedTopics<>(this);
-        }
-    }
-
-    public List<Object>buildTuple(ConsumerRecord<K,V> consumerRecord) {
-        final String topic = consumerRecord.topic();
-        return topicToTupleBuilders.get(topic).buildTuple(consumerRecord);
-    }
-
-    @Override
-    public String toString() {
-        return "KafkaSpoutTuplesBuilderNamedTopics {" +
-                "topicToTupleBuilders=" + topicToTupleBuilders +
-                '}';
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
deleted file mode 100644
index 85d4809..0000000
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaSpoutTuplesBuilderWildcardTopics.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-
-package org.apache.storm.kafka.spout;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-
-import java.util.List;
-
-public class KafkaSpoutTuplesBuilderWildcardTopics<K,V> implements KafkaSpoutTuplesBuilder<K,V> {
-    private KafkaSpoutTupleBuilder<K, V> tupleBuilder;
-
-    public KafkaSpoutTuplesBuilderWildcardTopics(KafkaSpoutTupleBuilder<K, V> tupleBuilder) {
-        this.tupleBuilder = tupleBuilder;
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return tupleBuilder.buildTuple(consumerRecord);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
new file mode 100644
index 0000000..f5953ad
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/KafkaTuple.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import org.apache.storm.tuple.Values;
+
+/**
+ * A list of Values in a tuple that can be routed 
+ * to a given stream. {@see org.apache.storm.kafka.spout.RecordTranslator#apply}
+ */
+public class KafkaTuple extends Values {
+    private static final long serialVersionUID = 4803794470450587992L;
+    private String stream = null;
+    
+    public KafkaTuple() {
+        super();
+    }
+    
+    public KafkaTuple(Object... vals) {
+        super(vals);
+    }
+    
+    public KafkaTuple routedTo(String stream) {
+        assert(this.stream == null);
+        this.stream = stream;
+        return this;
+    }
+
+    public String getStream() {
+        return stream;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
new file mode 100644
index 0000000..df3e800
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionNamedSubscription.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionNamedSubscription extends NamedSubscription {
+    private static final long serialVersionUID = 5633018073527583826L;
+    private final ManualPartitioner partitioner;
+    private Set<TopicPartition> currentAssignment = null;
+    private KafkaConsumer<?, ?> consumer = null;
+    private ConsumerRebalanceListener listener = null;
+    private TopologyContext context = null;
+
+    public ManualPartitionNamedSubscription(ManualPartitioner parter, Collection<String> topics) {
+        super(topics);
+        this.partitioner = parter;
+    }
+    
+    public ManualPartitionNamedSubscription(ManualPartitioner parter, String ... topics) {
+        this(parter, Arrays.asList(topics));
+    }
+    
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+        this.consumer = consumer;
+        this.listener = listener;
+        this.context = context;
+        refreshAssignment();
+    }
+    
+    @Override
+    public void refreshAssignment() {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for (String topic : topics) {
+            for (PartitionInfo partitionInfo: consumer.partitionsFor(topic)) {
+                allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+            }
+        }
+        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> newAssignment = new HashSet<>(partitioner.partition(allPartitions, context));
+        if (!newAssignment.equals(currentAssignment)) {
+            if (currentAssignment != null) {
+                listener.onPartitionsRevoked(currentAssignment);
+                listener.onPartitionsAssigned(newAssignment);
+            }
+            currentAssignment = newAssignment;
+            consumer.assign(currentAssignment);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
new file mode 100644
index 0000000..cf4dfcb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitionPatternSubscription.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.PartitionInfo;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+public class ManualPartitionPatternSubscription extends PatternSubscription {
+    private static final long serialVersionUID = 5633018073527583826L;
+    private final ManualPartitioner parter;
+    private Set<TopicPartition> currentAssignment = null;
+    private KafkaConsumer<?, ?> consumer = null;
+    private ConsumerRebalanceListener listener = null;
+    private TopologyContext context = null;
+
+    public ManualPartitionPatternSubscription(ManualPartitioner parter, Pattern pattern) {
+        super(pattern);
+        this.parter = parter;
+    }
+    
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext context) {
+        this.consumer = consumer;
+        this.listener = listener;
+        this.context = context;
+        refreshAssignment();
+    }
+    
+    @Override
+    public void refreshAssignment() {
+        List<TopicPartition> allPartitions = new ArrayList<>();
+        for(Map.Entry<String, List<PartitionInfo>> entry: consumer.listTopics().entrySet()) {
+            if (pattern.matcher(entry.getKey()).matches()) {
+                for (PartitionInfo partitionInfo: entry.getValue()) {
+                    allPartitions.add(new TopicPartition(partitionInfo.topic(), partitionInfo.partition()));
+                }
+            }
+        }
+        Collections.sort(allPartitions, TopicPartitionComparator.INSTANCE);
+        Set<TopicPartition> newAssignment = new HashSet<>(parter.partition(allPartitions, context));
+        if (!newAssignment.equals(currentAssignment)) {
+            if (currentAssignment != null) {
+                listener.onPartitionsRevoked(currentAssignment);
+                listener.onPartitionsAssigned(newAssignment);
+            }
+            currentAssignment = newAssignment;
+            consumer.assign(currentAssignment);
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
new file mode 100644
index 0000000..88803f8
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/ManualPartitioner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.List;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A function used to assign partitions to this spout.
+ * WARNING if this is not done correctly you can really mess things up, like not reading data in some partitions.
+ * The complete TopologyContext is passed in, but it is suggested that you use the index of the spout and the total
+ * number of spouts to avoid missing partitions or double assigning partitions.
+ */
+@FunctionalInterface
+public interface ManualPartitioner {
+    /**
+     * Get the partitions for this assignment
+     * @param allPartitions all of the partitions that the set of spouts want to subscribe to, in a strict ordering
+     * @param context the context of the topology
+     * @return the subset of the partitions that this spout should use.
+     */
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context);
+}