You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2017/02/01 15:56:26 UTC

[3/8] storm git commit: STORM-1997: copy state/bolt from storm-kafka to storm-kafka-client STORM-2225: change spout config to be simpler. STORM-2228: removed ability to request a single topic go to multiple streams STORM-2236: Reimplemented manual partit

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
new file mode 100644
index 0000000..6c5dcfb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/NamedSubscription.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that follow a given list of values
+ */
+public class NamedSubscription extends Subscription {
+    private static final Logger LOG = LoggerFactory.getLogger(NamedSubscription.class);
+    private static final long serialVersionUID = 3438543305215813839L;
+    protected final Collection<String> topics;
+    
+    public NamedSubscription(Collection<String> topics) {
+        this.topics = Collections.unmodifiableCollection(new ArrayList<>(topics));
+    }
+    
+    public NamedSubscription(String ... topics) {
+        this(Arrays.asList(topics));
+    }
+
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
+        consumer.subscribe(topics, listener);
+        LOG.info("Kafka consumer subscribed topics {}", topics);
+        
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+        consumer.poll(0);
+    }
+
+    @Override
+    public String getTopicsString() {
+        return String.valueOf(topics);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
new file mode 100644
index 0000000..9a8de0f
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/PatternSubscription.java
@@ -0,0 +1,54 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.regex.Pattern;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Subscribe to all topics that match a given pattern
+ */
+public class PatternSubscription extends Subscription {
+    private static final Logger LOG = LoggerFactory.getLogger(PatternSubscription.class);
+    private static final long serialVersionUID = 3438543305215813839L;
+    protected final Pattern pattern;
+    
+    public PatternSubscription(Pattern pattern) {
+        this.pattern = pattern;
+    }
+
+    @Override
+    public <K, V> void subscribe(KafkaConsumer<K, V> consumer, ConsumerRebalanceListener listener, TopologyContext unused) {
+        consumer.subscribe(pattern, listener);
+        LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
+        
+        // Initial poll to get the consumer registration process going.
+        // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
+        consumer.poll(0);
+    }
+
+    @Override
+    public String getTopicsString() {
+        return pattern.pattern();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
new file mode 100644
index 0000000..2e72c99
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RecordTranslator.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+/**
+ * Translate a {@link org.apache.kafka.clients.consumer.ConsumerRecord} to a tuple.
+ */
+public interface RecordTranslator<K, V> extends Serializable, Func<ConsumerRecord<K, V>, List<Object>> {
+    public static final List<String> DEFAULT_STREAM = Collections.singletonList("default");
+    
+    /**
+     * Translate the ConsumerRecord into a list of objects that can be emitted
+     * @param record the record to translate
+     * @return the objects in the tuple.  Return a {@link KafkaTuple}
+     * if you want to route the tuple to a non-default stream
+     */
+    List<Object> apply(ConsumerRecord<K,V> record);
+    
+    /**
+     * Get the fields associated with a stream.  The streams passed in are
+     * returned by the {@link RecordTranslator.streams} method.
+     * @param stream the stream the fields are for
+     * @return the fields for that stream.
+     */
+    Fields getFieldsFor(String stream);
+    
+    /**
+     * @return the list of streams that this will handle.
+     */
+    List<String> streams();
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
new file mode 100644
index 0000000..e23e2dc
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/RoundRobinManualPartitioner.java
@@ -0,0 +1,50 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * Assign partitions in a round robin fashion for all spouts,
+ * not just the ones that are alive. Because the parallelism of 
+ * the spouts does not typically change while running this makes
+ * the assignments more stable in the face of crashing spouts.
+ * 
+ * Round Robin means that first spout of N spouts will get the first
+ * partition, and the N+1th partition... The second spout will get the second partition and
+ * N+2th partition etc.
+ */
+public class RoundRobinManualPartitioner implements ManualPartitioner {
+
+    @Override
+    public List<TopicPartition> partition(List<TopicPartition> allPartitions, TopologyContext context) {
+        int thisTaskIndex = context.getThisTaskIndex();
+        int totalTaskCount = context.getComponentTasks(context.getThisComponentId()).size();
+        Set<TopicPartition> myPartitions = new HashSet<>(allPartitions.size()/totalTaskCount+1);
+        for (int i = thisTaskIndex; i < allPartitions.size(); i += totalTaskCount) {
+            myPartitions.add(allPartitions.get(i));
+        }
+        return new ArrayList<>(myPartitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
new file mode 100644
index 0000000..46c2849
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/SimpleRecordTranslator.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+
+public class SimpleRecordTranslator<K, V> implements RecordTranslator<K, V> {
+    private static final long serialVersionUID = 4678369144122009596L;
+    private final Fields fields;
+    private final Func<ConsumerRecord<K, V>, List<Object>> func;
+    private final String stream;
+
+    public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields) {
+        this(func, fields, "default");
+    }
+    
+    public SimpleRecordTranslator(Func<ConsumerRecord<K, V>, List<Object>> func, Fields fields, String stream) {
+        this.func = func;
+        this.fields = fields;
+        this.stream = stream;
+    }
+    
+    @Override
+    public List<Object> apply(ConsumerRecord<K, V> record) {
+        KafkaTuple ret = new KafkaTuple();
+        ret.addAll(func.apply(record));
+        return ret.routedTo(stream);
+    }
+
+    @Override
+    public Fields getFieldsFor(String stream) {
+        return fields;
+    }
+    
+    @Override
+    public List<String> streams() {
+        return Arrays.asList(stream);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
new file mode 100644
index 0000000..db2a7bb
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/Subscription.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.io.Serializable;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.storm.task.TopologyContext;
+
+/**
+ * A subscription to kafka.
+ */
+public abstract class Subscription implements Serializable {
+    private static final long serialVersionUID = -216136367240198716L;
+
+    /**
+     * Subscribe the KafkaConsumer to the proper topics
+     * @param consumer the Consumer to get.
+     * @param listener the rebalance listener to include in the subscription
+     */
+    public abstract <K, V> void subscribe(KafkaConsumer<K,V> consumer, ConsumerRebalanceListener listener, TopologyContext context);
+    
+    /**
+     * @return a string representing the subscribed topics.
+     */
+    public abstract String getTopicsString();
+    
+    /**
+     * NOOP is the default behavior, which means that Kafka will internally handle partition assignment.
+     * If you wish to do manual partition management, you must provide an implementation of this method
+     * that will check with kafka for any changes and call the ConsumerRebalanceListener from subscribe
+     * to inform the rest of the system of those changes.
+     */
+    public void refreshAssignment() {
+        //NOOP
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
new file mode 100644
index 0000000..dafb97c
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/TopicPartitionComparator.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import java.util.Comparator;
+
+import org.apache.kafka.common.TopicPartition;
+
+/**
+ * Singleton comparator of TopicPartitions.  Topics have precedence over partitions.
+ * Topics are compared through String.compare and partitions are compared
+ * numerically.
+ * 
+ * Use INSTANCE for all sorting.
+ */
+public class TopicPartitionComparator implements Comparator<TopicPartition> {
+    public static final TopicPartitionComparator INSTANCE = new TopicPartitionComparator();
+    
+    /**
+     * Private to make it a singleton
+     */
+    private TopicPartitionComparator() {
+        //Empty
+    }
+    
+    @Override
+    public int compare(TopicPartition o1, TopicPartition o2) {
+        if (!o1.topic().equals(o2.topic())) {
+            return o1.topic().compareTo(o2.topic());
+        } else {
+            return o1.partition() - o2.partition();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
new file mode 100644
index 0000000..d51104d
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/internal/Timer.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout.internal;
+
+import java.util.concurrent.TimeUnit;
+
+public class Timer {
+    private final long delay;
+    private final long period;
+    private final TimeUnit timeUnit;
+    private final long periodNanos;
+    private long start;
+
+    /**
+     * Creates a class that mimics a single threaded timer that expires periodically. If a call to {@link
+     * #isExpiredResetOnTrue()} occurs later than {@code period} since the timer was initiated or reset, this method returns
+     * true. Each time the method returns true the counter is reset. The timer starts with the specified time delay.
+     *
+     * @param delay    the initial delay before the timer starts
+     * @param period   the period between calls {@link #isExpiredResetOnTrue()}
+     * @param timeUnit the time unit of delay and period
+     */
+    public Timer(long delay, long period, TimeUnit timeUnit) {
+        this.delay = delay;
+        this.period = period;
+        this.timeUnit = timeUnit;
+
+        periodNanos = timeUnit.toNanos(period);
+        start = System.nanoTime() + timeUnit.toNanos(delay);
+    }
+
+    public long period() {
+        return period;
+    }
+
+    public long delay() {
+        return delay;
+    }
+
+    public TimeUnit getTimeUnit() {
+        return timeUnit;
+    }
+
+    /**
+     * Checks if a call to this method occurs later than {@code period} since the timer was initiated or reset. If that is the
+     * case the method returns true, otherwise it returns false. Each time this method returns true, the counter is reset
+     * (re-initiated) and a new cycle will start.
+     *
+     * @return true if the time elapsed since the last call returning true is greater than {@code period}. Returns false
+     * otherwise.
+     */
+    public boolean isExpiredResetOnTrue() {
+        final boolean expired = System.nanoTime() - start > periodNanos;
+        if (expired) {
+            start = System.nanoTime();
+        }
+        return expired;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
index 0218f07..19b4f01 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -18,33 +18,38 @@
 
 package org.apache.storm.kafka.spout.trident;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.operation.TridentCollector;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.trident.topology.TransactionAttempt;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
-
 public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentSpout.Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>>, Serializable {
+    private static final long serialVersionUID = -7343927794834130435L;
+
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
 
     // Kafka
@@ -53,18 +58,20 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
     // Bookkeeping
     private final KafkaTridentSpoutManager<K, V> kafkaManager;
     // Declare some KafkaTridentSpoutManager references for convenience
-    private final KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;
     private final long pollTimeoutMs;
     private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final RecordTranslator<K, V> translator;
+    private final Timer refreshSubscriptionTimer;
 
-    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager) {
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K,V> kafkaManager, TopologyContext context) {
         this.kafkaManager = kafkaManager;
-        this.kafkaManager.subscribeKafkaConsumer();
+        this.kafkaManager.subscribeKafkaConsumer(context);
+        refreshSubscriptionTimer = new Timer(500, kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS);
 
         //must subscribeKafkaConsumer before this line
         kafkaConsumer = kafkaManager.getKafkaConsumer();
+        translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
 
-        tuplesBuilder = kafkaManager.getTuplesBuilder();
         final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
         pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
         firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
@@ -88,6 +95,9 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
             seek(topicPartition, lastBatch);
 
             // poll
+            if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+            }
             final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
             LOG.debug("Polled [{}] records from Kafka.", records.count());
 
@@ -106,7 +116,7 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
 
     private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
         for (ConsumerRecord<K, V> record : records) {
-            final List<Object> tuple = tuplesBuilder.buildTuple(record);
+            final List<Object> tuple = translator.apply(record);
             collector.emit(tuple);
             LOG.debug("Emitted tuple [{}] for record: [{}]", tuple, record);
         }
@@ -131,18 +141,18 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
             final OffsetAndMetadata committedOffset = kafkaConsumer.committed(tp);
             if (committedOffset != null) {             // offset was committed for this TopicPartition
                 if (firstPollOffsetStrategy.equals(EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(toArrayList(tp));
+                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
                 } else if (firstPollOffsetStrategy.equals(LATEST)) {
-                    kafkaConsumer.seekToEnd(toArrayList(tp));
+                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
                 } else {
                     // By default polling starts at the last committed offset. +1 to point fetch to the first uncommitted offset.
                     kafkaConsumer.seek(tp, committedOffset.offset() + 1);
                 }
             } else {    // no commits have ever been done, so start at the beginning or end depending on the strategy
                 if (firstPollOffsetStrategy.equals(EARLIEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_EARLIEST)) {
-                    kafkaConsumer.seekToBeginning(toArrayList(tp));
+                    kafkaConsumer.seekToBeginning(Collections.singleton(tp));
                 } else if (firstPollOffsetStrategy.equals(LATEST) || firstPollOffsetStrategy.equals(UNCOMMITTED_LATEST)) {
-                    kafkaConsumer.seekToEnd(toArrayList(tp));
+                    kafkaConsumer.seekToEnd(Collections.singleton(tp));
                 }
             }
         }
@@ -151,10 +161,6 @@ public class KafkaTridentSpoutEmitter<K,V> implements IOpaquePartitionedTridentS
         return fetchOffset;
     }
 
-    private Collection<TopicPartition> toArrayList(final TopicPartition tp) {
-        return new ArrayList<TopicPartition>(1){{add(tp);}};
-    }
-
     // returns paused topic partitions
     private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
         final Set<TopicPartition> pausedTopicPartitions  = new HashSet<>(kafkaConsumer.assignment());

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
index d2c1700..4b60f33 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -18,23 +18,22 @@
 
 package org.apache.storm.kafka.spout.trident;
 
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.KafkaSpoutConfig;
-import org.apache.storm.kafka.spout.KafkaSpoutStreams;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsNamedTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutStreamsWildcardTopics;
-import org.apache.storm.kafka.spout.KafkaSpoutTuplesBuilder;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.Serializable;
-import java.util.Collection;
-import java.util.List;
-import java.util.Set;
-import java.util.regex.Pattern;
-
 public class KafkaTridentSpoutManager<K, V> implements Serializable {
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
 
@@ -44,29 +43,30 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
     // Bookkeeping
     private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
     // Declare some KafkaSpoutConfig references for convenience
-    private KafkaSpoutStreams kafkaSpoutStreams;                // Object that wraps all the logic to declare output fields and emit tuples
-    private KafkaSpoutTuplesBuilder<K, V> tuplesBuilder;        // Object that contains the logic to build tuples for each ConsumerRecord
+    private final Fields fields;
 
     public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
         this.kafkaSpoutConfig = kafkaSpoutConfig;
-        kafkaSpoutStreams = kafkaSpoutConfig.getKafkaSpoutStreams();
-        tuplesBuilder = kafkaSpoutConfig.getTuplesBuilder();
+        RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+        Fields fields = null;
+        for (String stream: translator.streams()) {
+            if (fields == null) {
+                fields = translator.getFieldsFor(stream);
+            } else {
+                if (!fields.equals(translator.getFieldsFor(stream))) {
+                    throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
+                }
+            }
+        }
+        this.fields = fields;
         LOG.debug("Created {}", this);
     }
 
-    void subscribeKafkaConsumer() {
+    void subscribeKafkaConsumer(TopologyContext context) {
         kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps(),
                 kafkaSpoutConfig.getKeyDeserializer(), kafkaSpoutConfig.getValueDeserializer());
 
-        if (kafkaSpoutStreams instanceof KafkaSpoutStreamsNamedTopics) {
-            final List<String> subTopics = kafkaSpoutConfig.getSubscribedTopics();
-            kafkaConsumer.subscribe(subTopics, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics {}", subTopics);
-        } else if (kafkaSpoutStreams instanceof KafkaSpoutStreamsWildcardTopics) {
-            final Pattern pattern = kafkaSpoutConfig.getTopicWildcardPattern();
-            kafkaConsumer.subscribe(pattern, new KafkaSpoutConsumerRebalanceListener());
-            LOG.info("Kafka consumer subscribed topics matching wildcard pattern [{}]", pattern);
-        }
+        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
 
         // Initial poll to get the consumer registration process going.
         // KafkaSpoutConsumerRebalanceListener will be called following this poll, upon partition registration
@@ -93,16 +93,12 @@ public class KafkaTridentSpoutManager<K, V> implements Serializable {
         return kafkaConsumer;
     }
 
-    KafkaSpoutTuplesBuilder<K, V> getTuplesBuilder() {
-        return tuplesBuilder;
-    }
-
     Set<TopicPartition> getTopicPartitions() {
         return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
     }
-
-    KafkaSpoutStreams getKafkaSpoutStreams() {
-        return kafkaSpoutStreams;
+    
+    Fields getFields() {
+        return fields;
     }
 
     KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
index b77ad11..5c5856c 100644
--- a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -19,6 +19,7 @@
 package org.apache.storm.kafka.spout.trident;
 
 import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
 import org.apache.storm.task.TopologyContext;
 import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
 import org.apache.storm.tuple.Fields;
@@ -29,12 +30,19 @@ import java.util.List;
 import java.util.Map;
 
 public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> {
+    private static final long serialVersionUID = -8003272486566259640L;
+
     private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
 
     private KafkaTridentSpoutManager<K, V> kafkaManager;
     private KafkaTridentSpoutEmitter<K, V> kafkaTridentSpoutEmitter;
     private KafkaTridentSpoutOpaqueCoordinator<K, V> coordinator;
 
+    
+    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
+        this(new KafkaTridentSpoutManager<>(conf));
+    }
+    
     public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
         this.kafkaManager = kafkaManager;
         LOG.debug("Created {}", this);
@@ -44,7 +52,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
     public Emitter<List<TopicPartition>, KafkaTridentSpoutTopicPartition, KafkaTridentSpoutBatchMetadata<K,V>> getEmitter(Map conf, TopologyContext context) {
         // Instance is created on first call rather than in constructor to avoid NotSerializableException caused by KafkaConsumer
         if (kafkaTridentSpoutEmitter == null) {
-            kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager);
+            kafkaTridentSpoutEmitter = new KafkaTridentSpoutEmitter<>(kafkaManager, context);
         }
         return kafkaTridentSpoutEmitter;
     }
@@ -64,7 +72,7 @@ public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSp
 
     @Override
     public Fields getOutputFields() {
-        final Fields outputFields = kafkaManager.getKafkaSpoutStreams().getOutputFields();
+        final Fields outputFields = kafkaManager.getFields();
         LOG.debug("OutputFields = {}", outputFields);
         return outputFields;
     }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..f24fed5
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,115 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+    private KafkaProducer producer;
+    private OutputCollector collector;
+
+    private TridentTupleToKafkaMapper mapper;
+    private KafkaTopicSelector topicSelector;
+
+    public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is Noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is Noop.");
+    }
+
+    public void prepare(Properties options) {
+        Objects.requireNonNull(mapper, "mapper can not be null");
+        Objects.requireNonNull(topicSelector, "topicSelector can not be null");
+        producer = new KafkaProducer(options);
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        String topic = null;
+        try {
+            List<Future<RecordMetadata>> futures = new ArrayList<>(tuples.size());
+            for (TridentTuple tuple : tuples) {
+                topic = topicSelector.getTopic(tuple);
+
+                if(topic != null) {
+                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+                    futures.add(result);
+                } else {
+                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+                }
+            }
+
+            List<ExecutionException> exceptions = new ArrayList<>(futures.size());
+            for (Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (ExecutionException e) {
+                    exceptions.add(e);
+                }
+            }
+
+            if(exceptions.size() > 0){
+                String errorMsg = "Could not retrieve result for messages " + tuples + " from topic = " + topic 
+                        + " because of the following exceptions: \n";
+                for (ExecutionException exception : exceptions) {
+                    errorMsg = errorMsg + exception.getMessage() + "\n";
+                }
+                LOG.error(errorMsg);
+                throw new FailedException(errorMsg);
+            }
+        } catch (Exception ex) {
+            String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
+            LOG.warn(errorMsg, ex);
+            throw new FailedException(errorMsg, ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..f564510
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory implements StateFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+    private TridentTupleToKafkaMapper mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties producerProperties = new Properties();
+
+    public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public TridentKafkaStateFactory withProducerProperties(Properties props) {
+        this.producerProperties = props;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        TridentKafkaState state = new TridentKafkaState()
+                .withKafkaTopicSelector(this.topicSelector)
+                .withTridentTupleToKafkaMapper(this.mapper);
+        state.prepare(producerProperties);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+    public final String keyFieldName;
+    public final String msgFieldName;
+
+    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+        this.keyFieldName = keyFieldName;
+        this.msgFieldName = msgFieldName;
+    }
+
+    @Override
+    public K getKeyFromTuple(TridentTuple tuple) {
+        return (K) tuple.getValueByField(keyFieldName);
+    }
+
+    @Override
+    public V getMessageFromTuple(TridentTuple tuple) {
+        return (V) tuple.getValueByField(msgFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
+    K getKeyFromTuple(TridentTuple tuple);
+    V getMessageFromTuple(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..607c996
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -1172454882072591493L;
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..8c8a945
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);
+    
+    @SuppressWarnings({ "unchecked", "serial" })
+    @Test
+    public void testSimple() {
+        final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
+        when(producer.send((ProducerRecord<String,String>)any(), (Callback)any())).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Callback c = (Callback)invocation.getArguments()[1];
+                c.onCompletion(null, null);
+                return null;
+            }
+        });
+        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+            @Override
+            protected KafkaProducer<String, String> mkProducer(Properties props) {
+                return producer;
+            }
+        };
+        bolt.withTopicSelector("MY_TOPIC");
+        
+        OutputCollector collector = mock(OutputCollector.class);
+        TopologyContext context = mock(TopologyContext.class);
+        Map<String, Object> conf = new HashMap<>();
+        bolt.prepare(conf, context, collector);
+        MkTupleParam param = new MkTupleParam();
+        param.setFields("key", "message");
+        Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param);
+        bolt.execute(testTuple);
+        verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() {
+            @Override
+            public boolean matches(Object argument) {
+                LOG.info("GOT {} ->", argument);
+                ProducerRecord<String, String> arg = (ProducerRecord<String, String>) argument;
+                LOG.info("  {} {} {}", arg.topic(), arg.key(), arg.value());
+                return "MY_TOPIC".equals(arg.topic()) &&
+                        "KEY".equals(arg.key()) &&
+                        "VALUE".equals(arg.value());
+            }
+        }), any(Callback.class));
+        verify(collector).ack(testTuple);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
new file mode 100644
index 0000000..ea0b6e7
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+    public static Func<ConsumerRecord<String, String>, List<Object>> JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key(), record.value());
+        }
+    };
+    
+    @Test
+    public void testBasic() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), "value-stream");
+        trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), "key-value-stream");
+        HashSet<String> expectedStreams = new HashSet<>();
+        expectedStreams.add("default");
+        expectedStreams.add("value-stream");
+        expectedStreams.add("key-value-stream");
+        assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+        ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC OTHER", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+        assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+        
+        ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("value"), trans.getFieldsFor("value-stream"));
+        assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+        
+        ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
+        assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testFieldCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"));
+    }
+    
+    @Test(expected = IllegalStateException.class)
+    public void testTopicCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1");
+        trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), "foo2");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
new file mode 100644
index 0000000..f4275e4
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/DefaultRecordTranslatorTest.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.Arrays;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.junit.Test;
+
+public class DefaultRecordTranslatorTest {
+    @Test
+    public void testBasic() {
+        DefaultRecordTranslator<String, String> trans = new DefaultRecordTranslator<>();
+        assertEquals(Arrays.asList("default"), trans.streams());
+        assertEquals(new Fields("topic", "partition", "offset", "key", "value"), trans.getFieldsFor("default"));
+        ConsumerRecord<String, String> cr = new ConsumerRecord<>("TOPIC", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(Arrays.asList("TOPIC", 100, 100l, "THE KEY", "THE VALUE"), trans.apply(cr));
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
new file mode 100644
index 0000000..08220dd
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutConfigTest.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.*;
+
+import java.util.HashMap;
+
+import org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy;
+import org.junit.Test;
+
+public class KafkaSpoutConfigTest {
+
+    @Test
+    public void testBasic() {
+        KafkaSpoutConfig<String, String> conf = KafkaSpoutConfig.builder("localhost:1234", "topic").build();
+        assertEquals(FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST, conf.getFirstPollOffsetStrategy());
+        assertNull(conf.getConsumerGroupId());
+        assertTrue(conf.getTranslator() instanceof DefaultRecordTranslator);
+        HashMap<String, Object> expected = new HashMap<>();
+        expected.put("bootstrap.servers", "localhost:1234");
+        expected.put("enable.auto.commit", "false");
+        assertEquals(expected, conf.getKafkaProps());
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
index 2d3eb56..9969d84 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutRebalanceTest.java
@@ -15,11 +15,25 @@
  */
 package org.apache.storm.kafka.spout;
 
+import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.Matchers.hasKey;
+import static org.junit.Assert.assertThat;
+import static org.mockito.Matchers.anyCollection;
+import static org.mockito.Matchers.anyLong;
+import static org.mockito.Matchers.anyObject;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.reset;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+
 import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
 import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.kafka.clients.consumer.ConsumerRecords;
@@ -27,37 +41,16 @@ import org.apache.kafka.clients.consumer.KafkaConsumer;
 import org.apache.kafka.clients.consumer.OffsetAndMetadata;
 import org.apache.kafka.common.TopicPartition;
 import org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration;
-
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutStreams;
-
 import org.apache.storm.kafka.spout.internal.KafkaConsumerFactory;
 import org.apache.storm.spout.SpoutOutputCollector;
 import org.apache.storm.task.TopologyContext;
+import org.junit.Before;
 import org.junit.Test;
 import org.mockito.ArgumentCaptor;
-
-import static org.mockito.Matchers.anyCollection;
-import static org.mockito.Matchers.anyLong;
-import static org.mockito.Matchers.anyObject;
-import static org.mockito.Mockito.when;
-
-import org.junit.Before;
 import org.mockito.Captor;
-
-import static org.mockito.Mockito.reset;
-
+import org.mockito.Mockito;
 import org.mockito.MockitoAnnotations;
 
-import static org.apache.storm.kafka.spout.builders.SingleTopicKafkaSpoutConfiguration.getKafkaSpoutConfig;
-import static org.hamcrest.CoreMatchers.not;
-import static org.hamcrest.Matchers.hasKey;
-import static org.junit.Assert.assertThat;
-import static org.mockito.Matchers.anyList;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-
 public class KafkaSpoutRebalanceTest {
 
     @Captor
@@ -65,7 +58,7 @@ public class KafkaSpoutRebalanceTest {
 
     private TopologyContext contextMock;
     private SpoutOutputCollector collectorMock;
-    private Map conf;
+    private Map<String, Object> conf;
     private KafkaConsumer<String, String> consumerMock;
     private KafkaConsumerFactory<String, String> consumerFactoryMock;
 
@@ -113,11 +106,11 @@ public class KafkaSpoutRebalanceTest {
         //Emit the messages
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForRevokedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyString(), anyList(), messageIdForRevokedPartition.capture());
+        verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForRevokedPartition.capture());
         reset(collectorMock);
         spout.nextTuple();
         ArgumentCaptor<KafkaSpoutMessageId> messageIdForAssignedPartition = ArgumentCaptor.forClass(KafkaSpoutMessageId.class);
-        verify(collectorMock).emit(anyString(), anyList(), messageIdForAssignedPartition.capture());
+        verify(collectorMock).emit(Mockito.anyString(), Mockito.anyList(), messageIdForAssignedPartition.capture());
 
         //Now rebalance
         consumerRebalanceListener.onPartitionsRevoked(assignedPartitions);
@@ -132,7 +125,7 @@ public class KafkaSpoutRebalanceTest {
     @Test
     public void spoutMustIgnoreAcksForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Acking tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10), consumerFactoryMock);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10), consumerFactoryMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);
@@ -161,7 +154,7 @@ public class KafkaSpoutRebalanceTest {
     public void spoutMustIgnoreFailsForTuplesItIsNotAssignedAfterRebalance() throws Exception {
         //Failing tuples for partitions that are no longer assigned is useless since the spout will not be allowed to commit them if they later pass
         KafkaSpoutRetryService retryServiceMock = mock(KafkaSpoutRetryService.class);
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), -1, 10, retryServiceMock), consumerFactoryMock);
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(-1, 10, retryServiceMock), consumerFactoryMock);
         String topic = SingleTopicKafkaSpoutConfiguration.TOPIC;
         TopicPartition partitionThatWillBeRevoked = new TopicPartition(topic, 1);
         TopicPartition assignedPartition = new TopicPartition(topic, 2);

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
deleted file mode 100644
index 723f5fd..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/KafkaSpoutStreamsNamedTopicsTest.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Copyright 2016 The Apache Software Foundation.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.storm.kafka.spout;
-
-import org.apache.storm.tuple.Fields;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class KafkaSpoutStreamsNamedTopicsTest {
-
-	@Test
-	public void testGetOutputFields() {
-		Fields outputFields = new Fields("b","a");
-		String[] topics = new String[]{"testTopic"};
-		String streamId = "test";
-		KafkaSpoutStreamsNamedTopics build = new KafkaSpoutStreamsNamedTopics.Builder(outputFields, streamId, topics)
-        .addStream(outputFields, streamId, topics)
-        .build();
-		Fields actualFields = build.getOutputFields();
-		Assert.assertEquals(outputFields.get(0), actualFields.get(0));
-		Assert.assertEquals(outputFields.get(1), actualFields.get(1));
-
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
index 6983160..f457b59 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/SingleTopicKafkaSpoutTest.java
@@ -70,7 +70,7 @@ public class SingleTopicKafkaSpoutTest {
         SpoutOutputCollector collector = mock(SpoutOutputCollector.class);
         Map conf = mock(Map.class);
 
-        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), kafkaPort));
+        KafkaSpout<String, String> spout = new KafkaSpout<>(getKafkaSpoutConfig(kafkaPort));
         spout.open(conf, topology, collector);
         spout.activate();
         return new SpoutContext(spout, collector);

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
index 95b2199..99bd3de 100644
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/SingleTopicKafkaSpoutConfiguration.java
@@ -17,17 +17,21 @@
  */
 package org.apache.storm.kafka.spout.builders;
 
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
 import org.apache.storm.Config;
 import org.apache.storm.generated.StormTopology;
-import org.apache.storm.kafka.spout.*;
+import org.apache.storm.kafka.spout.Func;
+import org.apache.storm.kafka.spout.KafkaSpout;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.KafkaSpoutRetryService;
 import org.apache.storm.kafka.spout.test.KafkaSpoutTestBolt;
 import org.apache.storm.topology.TopologyBuilder;
 import org.apache.storm.tuple.Fields;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import org.apache.storm.tuple.Values;
 
 public class SingleTopicKafkaSpoutConfiguration {
 
@@ -42,53 +46,41 @@ public class SingleTopicKafkaSpoutConfiguration {
 
     public static StormTopology getTopologyKafkaSpout(int port) {
         final TopologyBuilder tp = new TopologyBuilder();
-        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(getKafkaSpoutStreams(), port)), 1);
+        tp.setSpout("kafka_spout", new KafkaSpout<>(getKafkaSpoutConfig(port)), 1);
         tp.setBolt("kafka_bolt", new KafkaSpoutTestBolt()).shuffleGrouping("kafka_spout", STREAM);
         return tp.createTopology();
     }
 
-    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port) {
-        return getKafkaSpoutConfig(kafkaSpoutStreams, port, 10_000);
+    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port) {
+        return getKafkaSpoutConfig(port, 10_000);
     }
 
-    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs) {
-        return getKafkaSpoutConfig(kafkaSpoutStreams, port, offsetCommitPeriodMs, getRetryService());
+    static public KafkaSpoutConfig<String, String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs) {
+        return getKafkaSpoutConfig(port, offsetCommitPeriodMs, getRetryService());
     }
 
-    public static KafkaSpoutConfig<String, String> getKafkaSpoutConfig(KafkaSpoutStreams kafkaSpoutStreams, int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
-        return new KafkaSpoutConfig.Builder<>(getKafkaConsumerProps(port), kafkaSpoutStreams, getTuplesBuilder(), retryService)
-            .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
-            .setFirstPollOffsetStrategy(EARLIEST)
-            .setMaxUncommittedOffsets(250)
-            .setPollTimeoutMs(1000)
-            .build();
+    private static Func<ConsumerRecord<String, String>, List<Object>> TOPIC_KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> r) {
+            return new Values(r.topic(), r.key(), r.value());
+        }
+    };
+    
+    static public KafkaSpoutConfig<String,String> getKafkaSpoutConfig(int port, long offsetCommitPeriodMs, KafkaSpoutRetryService retryService) {
+        return KafkaSpoutConfig.builder("127.0.0.1:" + port, TOPIC)
+                .setRecordTranslator(TOPIC_KEY_VALUE_FUNC,
+                        new Fields("topic", "key", "value"), STREAM)
+                .setGroupId("kafkaSpoutTestGroup")
+                .setMaxPollRecords(5)
+                .setRetry(retryService)
+                .setOffsetCommitPeriodMs(offsetCommitPeriodMs)
+                .setFirstPollOffsetStrategy(EARLIEST)
+                .setMaxUncommittedOffsets(250)
+                .setPollTimeoutMs(1000)
+                .build();
     }
-
+        
     protected static KafkaSpoutRetryService getRetryService() {
-        return new KafkaSpoutRetryExponentialBackoff(KafkaSpoutRetryExponentialBackoff.TimeInterval.microSeconds(0),
-            KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0), Integer.MAX_VALUE, KafkaSpoutRetryExponentialBackoff.TimeInterval.milliSeconds(0));
-
-    }
-
-    protected static Map<String, Object> getKafkaConsumerProps(int port) {
-        Map<String, Object> props = new HashMap<>();
-        props.put(KafkaSpoutConfig.Consumer.BOOTSTRAP_SERVERS, "127.0.0.1:" + port);
-        props.put(KafkaSpoutConfig.Consumer.GROUP_ID, "kafkaSpoutTestGroup");
-        props.put(KafkaSpoutConfig.Consumer.KEY_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put(KafkaSpoutConfig.Consumer.VALUE_DESERIALIZER, "org.apache.kafka.common.serialization.StringDeserializer");
-        props.put("max.poll.records", "5");
-        return props;
-    }
-
-    protected static KafkaSpoutTuplesBuilder<String, String> getTuplesBuilder() {
-        return new KafkaSpoutTuplesBuilderNamedTopics.Builder<>(
-            new TopicKeyValueTupleBuilder<String, String>(TOPIC))
-            .build();
-    }
-
-    public static KafkaSpoutStreams getKafkaSpoutStreams() {
-        final Fields outputFields = new Fields("topic", "key", "value");
-        return new KafkaSpoutStreamsNamedTopics.Builder(outputFields, STREAM, new String[]{TOPIC}) // contents of topics test sent to test_stream
-            .build();
+        return KafkaSpoutConfig.UNIT_TEST_RETRY_SERVICE;
     }
 }

http://git-wip-us.apache.org/repos/asf/storm/blob/a3e6f60f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
deleted file mode 100644
index 4f20b58..0000000
--- a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/builders/TopicKeyValueTupleBuilder.java
+++ /dev/null
@@ -1,40 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- *   or more contributor license agreements.  See the NOTICE file
- *   distributed with this work for additional information
- *   regarding copyright ownership.  The ASF licenses this file
- *   to you under the Apache License, Version 2.0 (the
- *   "License"); you may not use this file except in compliance
- *   with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- *   Unless required by applicable law or agreed to in writing, software
- *   distributed under the License is distributed on an "AS IS" BASIS,
- *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *   See the License for the specific language governing permissions and
- *   limitations under the License.
- */
-package org.apache.storm.kafka.spout.builders;
-
-import org.apache.kafka.clients.consumer.ConsumerRecord;
-import org.apache.storm.kafka.spout.KafkaSpoutTupleBuilder;
-import org.apache.storm.tuple.Values;
-
-import java.util.List;
-
-public class TopicKeyValueTupleBuilder<K, V> extends KafkaSpoutTupleBuilder<K,V> {
-    /**
-     * @param topics list of topics that use this implementation to build tuples
-     */
-    public TopicKeyValueTupleBuilder(String... topics) {
-        super(topics);
-    }
-
-    @Override
-    public List<Object> buildTuple(ConsumerRecord<K, V> consumerRecord) {
-        return new Values(consumerRecord.topic(),
-                consumerRecord.key(),
-                consumerRecord.value());
-    }
-}