You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/02/08 09:05:05 UTC

[04/14] storm git commit: STORM-2937: Overwrite storm-kafka-client 1.x-branch into 1.0.x-branch: copied external/storm-kafka-client from 1.x-branch

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
new file mode 100644
index 0000000..afe8f74
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/metrics/KafkaOffsetMetric.java
@@ -0,0 +1,141 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.metrics;
+
+import com.google.common.base.Supplier;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.internal.OffsetManager;
+import org.apache.storm.metric.api.IMetric;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * This class is used compute the partition and topic level offset metrics
+ * <p>
+ * Partition level metrics are:
+ * topicName/partition_{number}/earliestTimeOffset //gives beginning offset of the partition
+ * topicName/partition_{number}/latestTimeOffset //gives end offset of the partition
+ * topicName/partition_{number}/latestEmittedOffset //gives latest emitted offset of the partition from the spout
+ * topicName/partition_{number}/latestCompletedOffset //gives latest committed offset of the partition from the spout
+ * topicName/partition_{number}/spoutLag // the delta between the latest Offset and latestCompletedOffset
+ * topicName/partition_{number}/recordsInPartition // total number of records in the partition
+ * </p>
+ * <p>
+ * Topic level metrics are:
+ * topicName/totalEarliestTimeOffset //gives the total beginning offset of all the associated partitions of this spout
+ * topicName/totalLatestTimeOffset //gives the total end offset of all the associated partitions of this spout
+ * topicName/totalLatestEmittedOffset //gives the total latest emitted offset of all the associated partitions of this spout
+ * topicName/totalLatestCompletedOffset //gives the total latest committed offset of all the associated partitions of this spout
+ * topicName/spoutLag // total spout lag of all the associated partitions of this spout
+ * topicName/totalRecordsInPartitions //total number of records in all the associated partitions of this spout
+ * </p>
+ */
+public class KafkaOffsetMetric implements IMetric {
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaOffsetMetric.class);
+    private final Supplier<Map<TopicPartition, OffsetManager>> offsetManagerSupplier;
+    private final Supplier<KafkaConsumer> consumerSupplier;
+
+    public KafkaOffsetMetric(Supplier offsetManagerSupplier, Supplier consumerSupplier) {
+        this.offsetManagerSupplier = offsetManagerSupplier;
+        this.consumerSupplier = consumerSupplier;
+    }
+
+    @Override
+    public Object getValueAndReset() {
+
+        Map<TopicPartition, OffsetManager> offsetManagers = offsetManagerSupplier.get();
+        KafkaConsumer kafkaConsumer = consumerSupplier.get();
+
+        if (offsetManagers == null || offsetManagers.isEmpty() || kafkaConsumer == null) {
+            LOG.debug("Metrics Tick: offsetManagers or kafkaConsumer is null.");
+            return null;
+        }
+
+        Map<String,TopicMetrics> topicMetricsMap = new HashMap<>();
+        Set<TopicPartition> topicPartitions = offsetManagers.keySet();
+
+        Map<TopicPartition, Long> beginningOffsets = kafkaConsumer.beginningOffsets(topicPartitions);
+        Map<TopicPartition, Long> endOffsets = kafkaConsumer.endOffsets(topicPartitions);
+        //map to hold partition level and topic level metrics
+        Map<String, Long> result = new HashMap<>();
+
+        for (Map.Entry<TopicPartition, OffsetManager> entry : offsetManagers.entrySet()) {
+            TopicPartition topicPartition = entry.getKey();
+            OffsetManager offsetManager = entry.getValue();
+
+            long latestTimeOffset = endOffsets.get(topicPartition);
+            long earliestTimeOffset = beginningOffsets.get(topicPartition);
+
+            long latestEmittedOffset = offsetManager.getLatestEmittedOffset();
+            long latestCompletedOffset = offsetManager.getCommittedOffset();
+            long spoutLag = latestTimeOffset - latestCompletedOffset;
+            long recordsInPartition =  latestTimeOffset - earliestTimeOffset;
+
+            String metricPath = topicPartition.topic()  + "/partition_" + topicPartition.partition();
+            result.put(metricPath + "/" + "spoutLag", spoutLag);
+            result.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+            result.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+            result.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+            result.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
+            result.put(metricPath + "/" + "recordsInPartition", recordsInPartition);
+
+            TopicMetrics topicMetrics = topicMetricsMap.get(topicPartition.topic());
+            if (topicMetrics == null) {
+                topicMetrics = new TopicMetrics();
+                topicMetricsMap.put(topicPartition.topic(), topicMetrics);
+            }
+
+            topicMetrics.totalSpoutLag += spoutLag;
+            topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+            topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+            topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+            topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
+            topicMetrics.totalRecordsInPartitions += recordsInPartition;
+        }
+
+        for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+            String topic = e.getKey();
+            TopicMetrics topicMetrics = e.getValue();
+            result.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+            result.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+            result.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+            result.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+            result.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
+            result.put(topic + "/" + "totalRecordsInPartitions", topicMetrics.totalRecordsInPartitions);
+        }
+
+        LOG.debug("Metrics Tick: value : {}", result);
+        return result;
+    }
+
+    private class TopicMetrics {
+        long totalSpoutLag = 0;
+        long totalEarliestTimeOffset = 0;
+        long totalLatestTimeOffset = 0;
+        long totalLatestEmittedOffset = 0;
+        long totalLatestCompletedOffset = 0;
+        long totalRecordsInPartitions = 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
new file mode 100644
index 0000000..6fa81aa
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutBatchMetadata.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.common.TopicPartition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.List;
+
+/**
+ * Wraps transaction batch information
+ */
+public class KafkaTridentSpoutBatchMetadata implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutBatchMetadata.class);
+    private static final TopicPartitionSerializer TP_SERIALIZER = new TopicPartitionSerializer();
+    
+    public static final String TOPIC_PARTITION_KEY = "tp";
+    public static final String FIRST_OFFSET_KEY = "firstOffset";
+    public static final String LAST_OFFSET_KEY = "lastOffset";
+    
+    // topic partition of this batch
+    private final TopicPartition topicPartition;  
+    // first offset of this batch
+    private final long firstOffset;               
+    // last offset of this batch
+    private final long lastOffset;
+
+    /**
+     * Builds a metadata object.
+     * @param topicPartition The topic partition
+     * @param firstOffset The first offset for the batch
+     * @param lastOffset The last offset for the batch
+     */
+    public KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, long firstOffset, long lastOffset) {
+        this.topicPartition = topicPartition;
+        this.firstOffset = firstOffset;
+        this.lastOffset = lastOffset;
+    }
+
+    /**
+     * Builds a metadata object from a non-empty set of records.
+     * @param topicPartition The topic partition the records belong to.
+     * @param consumerRecords The non-empty set of records.
+     */
+    public <K, V> KafkaTridentSpoutBatchMetadata(TopicPartition topicPartition, ConsumerRecords<K, V> consumerRecords) {
+        Validate.notNull(consumerRecords.records(topicPartition));
+        List<ConsumerRecord<K, V>> records = consumerRecords.records(topicPartition);
+        Validate.isTrue(!records.isEmpty(), "There must be at least one record in order to build metadata");
+        
+        this.topicPartition = topicPartition;
+        firstOffset = records.get(0).offset();
+        lastOffset = records.get(records.size() - 1).offset();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    public long getFirstOffset() {
+        return firstOffset;
+    }
+
+    public long getLastOffset() {
+        return lastOffset;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+    
+    /**
+     * Constructs a metadata object from a Map in the format produced by {@link #toMap() }.
+     * @param map The source map
+     * @return A new metadata object
+     */
+    public static KafkaTridentSpoutBatchMetadata fromMap(Map<String, Object> map) {
+        Map<String, Object> topicPartitionMap = (Map<String, Object>)map.get(TOPIC_PARTITION_KEY);
+        TopicPartition tp = TP_SERIALIZER.fromMap(topicPartitionMap);
+        return new KafkaTridentSpoutBatchMetadata(tp, ((Number)map.get(FIRST_OFFSET_KEY)).longValue(),
+            ((Number)map.get(LAST_OFFSET_KEY)).longValue());
+    }
+    
+    /**
+     * Writes this metadata object to a Map so Trident can read/write it to Zookeeper.
+     */
+    public Map<String, Object> toMap() {
+        Map<String, Object> map = new HashMap<>();
+        map.put(TOPIC_PARTITION_KEY, TP_SERIALIZER.toMap(topicPartition));
+        map.put(FIRST_OFFSET_KEY, firstOffset);
+        map.put(LAST_OFFSET_KEY, lastOffset);
+        return map;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{topicPartition=" + topicPartition +
+                ", firstOffset=" + firstOffset +
+                ", lastOffset=" + lastOffset +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
new file mode 100644
index 0000000..3b4aa4b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutEmitter.java
@@ -0,0 +1,279 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.kafka.spout.internal.Timer;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.LATEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_EARLIEST;
+import static org.apache.storm.kafka.spout.KafkaSpoutConfig.FirstPollOffsetStrategy.UNCOMMITTED_LATEST;
+
+public class KafkaTridentSpoutEmitter<K, V> implements IOpaquePartitionedTridentSpout.Emitter<
+        List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition,
+        Map<String, Object>>,
+        Serializable {
+
+    private static final long serialVersionUID = -7343927794834130435L;
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutEmitter.class);
+
+    // Kafka
+    private final KafkaConsumer<K, V> kafkaConsumer;
+
+    // Bookkeeping
+    private final KafkaTridentSpoutManager<K, V> kafkaManager;
+    // set of topic-partitions for which first poll has already occurred, and the first polled txid
+    private final Map<TopicPartition, Long> firstPollTransaction = new HashMap<>(); 
+
+    // Declare some KafkaTridentSpoutManager references for convenience
+    private final long pollTimeoutMs;
+    private final KafkaSpoutConfig.FirstPollOffsetStrategy firstPollOffsetStrategy;
+    private final RecordTranslator<K, V> translator;
+    private final Timer refreshSubscriptionTimer;
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+
+    private TopologyContext topologyContext;
+
+    /**
+     * Create a new Kafka spout emitter.
+     * @param kafkaManager The Kafka consumer manager to use
+     * @param topologyContext The topology context
+     * @param refreshSubscriptionTimer The timer for deciding when to recheck the subscription
+     */
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext, Timer refreshSubscriptionTimer) {
+        this.kafkaConsumer = kafkaManager.createAndSubscribeKafkaConsumer(topologyContext);
+        this.kafkaManager = kafkaManager;
+        this.topologyContext = topologyContext;
+        this.refreshSubscriptionTimer = refreshSubscriptionTimer;
+        this.translator = kafkaManager.getKafkaSpoutConfig().getTranslator();
+
+        final KafkaSpoutConfig<K, V> kafkaSpoutConfig = kafkaManager.getKafkaSpoutConfig();
+        this.pollTimeoutMs = kafkaSpoutConfig.getPollTimeoutMs();
+        this.firstPollOffsetStrategy = kafkaSpoutConfig.getFirstPollOffsetStrategy();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    /**
+     * Creates instance of this class with default 500 millisecond refresh subscription timer
+     */
+    public KafkaTridentSpoutEmitter(KafkaTridentSpoutManager<K, V> kafkaManager, TopologyContext topologyContext) {
+        this(kafkaManager, topologyContext, new Timer(500,
+                kafkaManager.getKafkaSpoutConfig().getPartitionRefreshPeriodMs(), TimeUnit.MILLISECONDS));
+    }
+
+    @Override
+    public Map<String, Object> emitPartitionBatch(TransactionAttempt tx, TridentCollector collector,
+            KafkaTridentSpoutTopicPartition currBatchPartition, Map<String, Object> lastBatch) {
+
+        LOG.debug("Processing batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], [collector = {}]",
+                tx, currBatchPartition, lastBatch, collector);
+
+        final TopicPartition currBatchTp = currBatchPartition.getTopicPartition();
+        final Set<TopicPartition> assignments = kafkaConsumer.assignment();
+        KafkaTridentSpoutBatchMetadata lastBatchMeta = lastBatch == null ? null : KafkaTridentSpoutBatchMetadata.fromMap(lastBatch);
+        KafkaTridentSpoutBatchMetadata currentBatch = lastBatchMeta;
+        Collection<TopicPartition> pausedTopicPartitions = Collections.emptySet();
+
+        if (assignments == null || !assignments.contains(currBatchPartition.getTopicPartition())) {
+            LOG.warn("SKIPPING processing batch [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+                            "[collector = {}] because it is not part of the assignments {} of consumer instance [{}] " +
+                            "of consumer group [{}]", tx, currBatchPartition, lastBatch, collector, assignments,
+                    kafkaConsumer, kafkaManager.getKafkaSpoutConfig().getConsumerGroupId());
+        } else {
+            try {
+                // pause other topic-partitions to only poll from current topic-partition
+                pausedTopicPartitions = pauseTopicPartitions(currBatchTp);
+
+                seek(currBatchTp, lastBatchMeta, tx.getTransactionId());
+
+                // poll
+                if (refreshSubscriptionTimer.isExpiredResetOnTrue()) {
+                    kafkaManager.getKafkaSpoutConfig().getSubscription().refreshAssignment();
+                }
+
+                final ConsumerRecords<K, V> records = kafkaConsumer.poll(pollTimeoutMs);
+                LOG.debug("Polled [{}] records from Kafka.", records.count());
+
+                if (!records.isEmpty()) {
+                    emitTuples(collector, records);
+                    // build new metadata
+                    currentBatch = new KafkaTridentSpoutBatchMetadata(currBatchTp, records);
+                }
+            } finally {
+                kafkaConsumer.resume(pausedTopicPartitions);
+                LOG.trace("Resumed topic-partitions {}", pausedTopicPartitions);
+            }
+            LOG.debug("Emitted batch: [transaction = {}], [currBatchPartition = {}], [lastBatchMetadata = {}], " +
+                    "[currBatchMetadata = {}], [collector = {}]", tx, currBatchPartition, lastBatch, currentBatch, collector);
+        }
+
+        return currentBatch == null ? null : currentBatch.toMap();
+    }
+
+    private void emitTuples(TridentCollector collector, ConsumerRecords<K, V> records) {
+        for (ConsumerRecord<K, V> record : records) {
+            final List<Object> tuple = translator.apply(record);
+            collector.emit(tuple);
+            LOG.debug("Emitted tuple {} for record [{}]", tuple, record);
+        }
+    }
+
+    /**
+     * Determines the offset of the next fetch. Will use the firstPollOffsetStrategy if this is the first poll for the topic partition.
+     * Otherwise the next offset will be one past the last batch, based on lastBatchMeta.
+     * 
+     * <p>lastBatchMeta should only be null when the previous txid was not emitted (e.g. new topic),
+     * it is the first poll for the spout instance, or it is a replay of the first txid this spout emitted on this partition.
+     * In the second case, there are either no previous transactions, or the MBC is still committing them
+     * and they will fail because this spout did not emit the corresponding batches. If it had emitted them, the meta could not be null. 
+     * In any case, the lastBatchMeta should never be null if this is not the first poll for this spout instance.
+     *
+     * @return the offset of the next fetch
+     */
+    private long seek(TopicPartition tp, KafkaTridentSpoutBatchMetadata lastBatchMeta, long transactionId) {
+        if (isFirstPoll(tp, transactionId)) {
+            if (firstPollOffsetStrategy == EARLIEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == LATEST) {
+                LOG.debug("First poll for topic partition [{}], seeking to partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
+            } else if (lastBatchMeta != null) {
+                LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+                kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_EARLIEST) {
+                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition beginning", tp);
+                kafkaConsumer.seekToBeginning(Collections.singleton(tp));
+            } else if (firstPollOffsetStrategy == UNCOMMITTED_LATEST) {
+                LOG.debug("First poll for topic partition [{}] with no last batch metadata, seeking to partition end", tp);
+                kafkaConsumer.seekToEnd(Collections.singleton(tp));
+            }
+            firstPollTransaction.put(tp, transactionId);
+        } else {
+            kafkaConsumer.seek(tp, lastBatchMeta.getLastOffset() + 1);  // seek next offset after last offset from previous batch
+            LOG.debug("First poll for topic partition [{}], using last batch metadata", tp);
+        }
+
+        final long fetchOffset = kafkaConsumer.position(tp);
+        LOG.debug("Set [fetchOffset = {}]", fetchOffset);
+        return fetchOffset;
+    }
+
+    private boolean isFirstPoll(TopicPartition tp, long txid) {
+        // The first poll is either the "real" first transaction, or a replay of the first transaction
+        return !firstPollTransaction.containsKey(tp) || firstPollTransaction.get(tp) == txid;
+    }
+
+    // returns paused topic-partitions.
+    private Collection<TopicPartition> pauseTopicPartitions(TopicPartition excludedTp) {
+        final Set<TopicPartition> pausedTopicPartitions = new HashSet<>(kafkaConsumer.assignment());
+        LOG.debug("Currently assigned topic-partitions {}", pausedTopicPartitions);
+        pausedTopicPartitions.remove(excludedTp);
+        kafkaConsumer.pause(pausedTopicPartitions);
+        LOG.debug("Paused topic-partitions {}", pausedTopicPartitions);
+        return pausedTopicPartitions;
+    }
+
+    @Override
+    public void refreshPartitions(List<KafkaTridentSpoutTopicPartition> partitionResponsibilities) {
+        LOG.trace("Refreshing of topic-partitions handled by Kafka. " +
+                "No action taken by this method for topic partitions {}", partitionResponsibilities);
+    }
+
+    /**
+     * Computes ordered list of topic-partitions for this task taking into consideration that topic-partitions
+     * for this task must be assigned to the Kafka consumer running on this task.
+     *
+     * @param allPartitionInfo list of all partitions as returned by {@link KafkaTridentSpoutOpaqueCoordinator}
+     * @return ordered list of topic partitions for this task
+     */
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getOrderedPartitions(final List<Map<String, Object>> allPartitionInfo) {
+        List<TopicPartition> allTopicPartitions = new ArrayList<>();
+        for(Map<String, Object> map : allPartitionInfo) {
+            allTopicPartitions.add(tpSerializer.fromMap(map));
+        }
+        final List<KafkaTridentSpoutTopicPartition> allPartitions = newKafkaTridentSpoutTopicPartitions(allTopicPartitions);
+        LOG.debug("Returning all topic-partitions {} across all tasks. Current task index [{}]. Total tasks [{}] ",
+                allPartitions, topologyContext.getThisTaskIndex(), getNumTasks());
+        return allPartitions;
+    }
+
+    @Override
+    public List<KafkaTridentSpoutTopicPartition> getPartitionsForTask(int taskId, int numTasks,
+        List<Map<String, Object>> allPartitionInfo) {
+        final Set<TopicPartition> assignedTps = kafkaConsumer.assignment();
+        LOG.debug("Consumer [{}], running on task with index [{}], has assigned topic-partitions {}", kafkaConsumer, taskId, assignedTps);
+        final List<KafkaTridentSpoutTopicPartition> taskTps = newKafkaTridentSpoutTopicPartitions(assignedTps);
+        LOG.debug("Returning topic-partitions {} for task with index [{}]", taskTps, taskId);
+        return taskTps;
+    }
+
+    private List<KafkaTridentSpoutTopicPartition> newKafkaTridentSpoutTopicPartitions(Collection<TopicPartition> tps) {
+        final List<KafkaTridentSpoutTopicPartition> kttp = new ArrayList<>(tps == null ? 0 : tps.size());
+        if (tps != null) {
+            for (TopicPartition tp : tps) {
+                LOG.trace("Added topic-partition [{}]", tp);
+                kttp.add(new KafkaTridentSpoutTopicPartition(tp));
+            }
+        }
+        return kttp;
+    }
+
+    private int getNumTasks() {
+        return topologyContext.getComponentTasks(topologyContext.getThisComponentId()).size();
+    }
+
+    @Override
+    public void close() {
+        kafkaConsumer.close();
+        LOG.debug("Closed");
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
new file mode 100644
index 0000000..26db5c9
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutManager.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.clients.consumer.ConsumerRebalanceListener;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.kafka.spout.RecordTranslator;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Set;
+
+public class KafkaTridentSpoutManager<K, V> implements Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutManager.class);
+
+    // Kafka
+    private transient KafkaConsumer<K, V> kafkaConsumer;
+
+    // Bookkeeping
+    private final KafkaSpoutConfig<K, V> kafkaSpoutConfig;
+    // Declare some KafkaSpoutConfig references for convenience
+    private Fields fields;
+
+    /**
+     * Create a KafkaConsumer manager for the trident spout.
+     * @param kafkaSpoutConfig The consumer config
+     */
+    public KafkaTridentSpoutManager(KafkaSpoutConfig<K, V> kafkaSpoutConfig) {
+        this.kafkaSpoutConfig = kafkaSpoutConfig;
+        this.fields = getFields();
+        LOG.debug("Created {}", this.toString());
+    }
+
+    KafkaConsumer<K,V> createAndSubscribeKafkaConsumer(TopologyContext context) {
+        kafkaConsumer = new KafkaConsumer<>(kafkaSpoutConfig.getKafkaProps());
+
+        kafkaSpoutConfig.getSubscription().subscribe(kafkaConsumer, new KafkaSpoutConsumerRebalanceListener(), context);
+        return kafkaConsumer;
+    }
+
+    KafkaConsumer<K, V> getKafkaConsumer() {
+        return kafkaConsumer;
+    }
+
+    Set<TopicPartition> getTopicPartitions() {
+        return KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.getTopicPartitions();
+    }
+
+    final Fields getFields() {
+        if (fields == null) {
+            RecordTranslator<K, V> translator = kafkaSpoutConfig.getTranslator();
+            Fields fs = null;
+            for (String stream : translator.streams()) {
+                if (fs == null) {
+                    fs = translator.getFieldsFor(stream);
+                } else {
+                    if (!fs.equals(translator.getFieldsFor(stream))) {
+                        throw new IllegalArgumentException("Trident Spouts do not support multiple output Fields");
+                    }
+                }
+            }
+            fields = fs;
+        }
+        LOG.debug("OutputFields = {}", fields);
+        return fields;
+    }
+
+    KafkaSpoutConfig<K, V> getKafkaSpoutConfig() {
+        return kafkaSpoutConfig;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaConsumer=" + kafkaConsumer +
+                ", kafkaSpoutConfig=" + kafkaSpoutConfig +
+                '}';
+    }
+
+    private class KafkaSpoutConsumerRebalanceListener implements ConsumerRebalanceListener {
+        @Override
+        public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
+            LOG.info("Partitions revoked. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.removeAll(partitions);
+        }
+
+        @Override
+        public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
+            KafkaTridentSpoutTopicPartitionRegistry.INSTANCE.addAll(partitions);
+            LOG.info("Partitions reassignment. [consumer-group={}, consumer={}, topic-partitions={}]",
+                    kafkaSpoutConfig.getConsumerGroupId(), kafkaConsumer, partitions);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
new file mode 100644
index 0000000..ecc9219
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaque.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.storm.kafka.spout.KafkaSpoutConfig;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaque<K,V> implements IOpaquePartitionedTridentSpout<List<Map<String, Object>>,
+        KafkaTridentSpoutTopicPartition, Map<String, Object>> {
+    private static final long serialVersionUID = -8003272486566259640L;
+
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaque.class);
+
+    private final KafkaTridentSpoutManager<K, V> kafkaManager;
+
+    public KafkaTridentSpoutOpaque(KafkaSpoutConfig<K, V> conf) {
+        this(new KafkaTridentSpoutManager<>(conf));
+    }
+    
+    public KafkaTridentSpoutOpaque(KafkaTridentSpoutManager<K, V> kafkaManager) {
+        this.kafkaManager = kafkaManager;
+        LOG.debug("Created {}", this.toString());
+    }
+
+    @Override
+    public Emitter<List<Map<String, Object>>, KafkaTridentSpoutTopicPartition, Map<String, Object>> getEmitter(
+            Map conf, TopologyContext context) {
+        return new KafkaTridentSpoutEmitter<>(kafkaManager, context);
+    }
+
+    @Override
+    public Coordinator<List<Map<String, Object>>> getCoordinator(Map conf, TopologyContext context) {
+        return new KafkaTridentSpoutOpaqueCoordinator<>(kafkaManager);
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        final Fields outputFields = kafkaManager.getFields();
+        LOG.debug("OutputFields = {}", outputFields);
+        return outputFields;
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager + '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
new file mode 100644
index 0000000..449e24b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutOpaqueCoordinator.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaTridentSpoutOpaqueCoordinator<K,V> implements IOpaquePartitionedTridentSpout.Coordinator<List<Map<String, Object>>>,
+        Serializable {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaTridentSpoutOpaqueCoordinator.class);
+
+    private final TopicPartitionSerializer tpSerializer = new TopicPartitionSerializer();
+    private final KafkaTridentSpoutManager<K,V> kafkaManager;
+
+    public KafkaTridentSpoutOpaqueCoordinator(KafkaTridentSpoutManager<K, V> kafkaManager) {
+        this.kafkaManager = kafkaManager;
+        LOG.debug("Created {}", this.toString());
+    }
+
+    @Override
+    public boolean isReady(long txid) {
+        LOG.debug("isReady = true");
+        return true;    // the "old" trident kafka spout always returns true, like this
+    }
+
+    @Override
+    public List<Map<String, Object>> getPartitionsForBatch() {
+        final ArrayList<TopicPartition> topicPartitions = new ArrayList<>(kafkaManager.getTopicPartitions());
+        LOG.debug("TopicPartitions for batch {}", topicPartitions);
+        List<Map<String, Object>> tps = new ArrayList<>();
+        for(TopicPartition tp : topicPartitions) {
+            tps.add(tpSerializer.toMap(tp));
+        }
+        return tps;
+    }
+
+    @Override
+    public void close() {
+        LOG.debug("Closed"); // the "old" trident kafka spout is no op like this
+    }
+
+    @Override
+    public final String toString() {
+        return super.toString() +
+                "{kafkaManager=" + kafkaManager +
+                '}';
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
new file mode 100644
index 0000000..b020bea
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartition.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+import java.io.Serializable;
+
+/**
+ * {@link ISpoutPartition} that wraps {@link TopicPartition} information
+ */
+public class KafkaTridentSpoutTopicPartition implements ISpoutPartition, Serializable {
+    private TopicPartition topicPartition;
+
+    public KafkaTridentSpoutTopicPartition(String topic, int partition) {
+        this(new TopicPartition(topic, partition));
+    }
+
+    public KafkaTridentSpoutTopicPartition(TopicPartition topicPartition) {
+        this.topicPartition = topicPartition;
+    }
+
+    public TopicPartition getTopicPartition() {
+        return topicPartition;
+    }
+
+    @Override
+    public String getId() {
+        return topicPartition.topic() + "@" + topicPartition.partition();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) return true;
+        if (o == null || getClass() != o.getClass()) return false;
+
+        KafkaTridentSpoutTopicPartition that = (KafkaTridentSpoutTopicPartition) o;
+
+        return topicPartition != null ? topicPartition.equals(that.topicPartition) : that.topicPartition == null;
+    }
+
+    @Override
+    public int hashCode() {
+        return topicPartition != null ? topicPartition.hashCode() : 0;
+    }
+
+    @Override
+    public String toString()  {
+        return topicPartition.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
new file mode 100644
index 0000000..2d50ca7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTopicPartitionRegistry.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.kafka.common.TopicPartition;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
+public enum KafkaTridentSpoutTopicPartitionRegistry {
+    INSTANCE;
+
+    private Set<TopicPartition> topicPartitions;
+
+    KafkaTridentSpoutTopicPartitionRegistry() {
+        this.topicPartitions = new LinkedHashSet<>();
+    }
+
+    public Set<TopicPartition> getTopicPartitions() {
+        return Collections.unmodifiableSet(topicPartitions);
+    }
+
+    public void addAll(Collection<? extends TopicPartition> topicPartitions) {
+        this.topicPartitions.addAll(topicPartitions);
+    }
+
+    public void removeAll(Collection<? extends TopicPartition> topicPartitions) {
+        this.topicPartitions.removeAll(topicPartitions);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
new file mode 100644
index 0000000..6e1c587
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/KafkaTridentSpoutTransactional.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.tuple.Fields;
+
+import java.util.Map;
+
+// TODO
+public class KafkaTridentSpoutTransactional implements IPartitionedTridentSpout {
+    @Override
+    public Coordinator getCoordinator(Map conf, TopologyContext context) {
+        return null;
+    }
+
+    @Override
+    public Emitter getEmitter(Map conf, TopologyContext context) {
+        return null;
+    }
+
+    @Override
+    public Map<String, Object> getComponentConfiguration() {
+        return null;
+    }
+
+    @Override
+    public Fields getOutputFields() {
+        return null;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
new file mode 100644
index 0000000..50e78f0
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/spout/trident/TopicPartitionSerializer.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2017 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.spout.trident;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.kafka.common.TopicPartition;
+
+public class TopicPartitionSerializer {
+
+    public static final String TOPIC_PARTITION_TOPIC_KEY = "topic";
+    public static final String TOPIC_PARTITION_PARTITION_KEY = "partition";
+
+    /**
+     * Serializes the given TopicPartition to Map so Trident can serialize it to JSON.
+     */
+    public Map<String, Object> toMap(TopicPartition topicPartition) {
+        Map<String, Object> topicPartitionMap = new HashMap<>();
+        topicPartitionMap.put(TOPIC_PARTITION_TOPIC_KEY, topicPartition.topic());
+        topicPartitionMap.put(TOPIC_PARTITION_PARTITION_KEY, topicPartition.partition());
+        return topicPartitionMap;
+    }
+
+    /**
+     * Deserializes the given map into a TopicPartition. The map keys are expected to be those produced by
+     * {@link #toMap(org.apache.kafka.common.TopicPartition)}.
+     */
+    public TopicPartition fromMap(Map<String, Object> map) {
+        return new TopicPartition((String) map.get(TOPIC_PARTITION_TOPIC_KEY),
+            ((Number) map.get(TOPIC_PARTITION_PARTITION_KEY)).intValue());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..2e2c13b
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,138 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState<K, V> implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+    private KafkaProducer<K, V> producer;
+
+    private TridentTupleToKafkaMapper<K, V> mapper;
+    private KafkaTopicSelector topicSelector;
+
+    public TridentKafkaState<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaState<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is Noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is Noop.");
+    }
+
+    /**
+     * Prepare this State.
+     * @param options The KafkaProducer config.
+     */
+    public void prepare(Properties options) {
+        Objects.requireNonNull(mapper, "mapper can not be null");
+        Objects.requireNonNull(topicSelector, "topicSelector can not be null");
+        producer = new KafkaProducer<>(options);
+    }
+
+    /**
+     * Write the given tuples to Kafka.
+     * @param tuples The tuples to write.
+     * @param collector The Trident collector.
+     */
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        String topic = null;
+        try {
+            long startTime = System.currentTimeMillis();
+            int numberOfRecords = tuples.size();
+            List<Future<RecordMetadata>> futures = new ArrayList<>(numberOfRecords);
+            for (TridentTuple tuple : tuples) {
+                topic = topicSelector.getTopic(tuple);
+                V messageFromTuple = mapper.getMessageFromTuple(tuple);
+                K keyFromTuple = mapper.getKeyFromTuple(tuple);
+
+                if (topic != null) {
+                    if (messageFromTuple != null) {
+                        Future<RecordMetadata> result = producer.send(new ProducerRecord<>(topic, keyFromTuple, messageFromTuple));
+                        futures.add(result);
+                    } else {
+                        LOG.warn("skipping Message with Key {} as message was null", keyFromTuple);
+                    }
+
+                } else {
+                    LOG.warn("skipping key = {}, topic selector returned null.", keyFromTuple);
+                }
+            }
+
+            int emittedRecords = futures.size();
+            List<ExecutionException> exceptions = new ArrayList<>(emittedRecords);
+            for (Future<RecordMetadata> future : futures) {
+                try {
+                    future.get();
+                } catch (ExecutionException e) {
+                    exceptions.add(e);
+                }
+            }
+
+            if (exceptions.size() > 0) {
+                StringBuilder errorMsg = new StringBuilder("Could not retrieve result for messages ");
+                errorMsg.append(tuples).append(" from topic = ").append(topic)
+                        .append(" because of the following exceptions:").append(System.lineSeparator());
+
+                for (ExecutionException exception : exceptions) {
+                    errorMsg = errorMsg.append(exception.getMessage()).append(System.lineSeparator());
+                }
+                String message = errorMsg.toString();
+                LOG.error(message);
+                throw new FailedException(message);
+            }
+            long latestTime = System.currentTimeMillis();
+            LOG.info("Emitted record {} sucessfully in {} ms to topic {} ", emittedRecords, latestTime - startTime, topic);
+
+        } catch (Exception ex) {
+            String errorMsg = "Could not send messages " + tuples + " to topic = " + topic;
+            LOG.warn(errorMsg, ex);
+            throw new FailedException(errorMsg, ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..35620de
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory<K, V> implements StateFactory {
+
+    private static final long serialVersionUID = -3613240970062343385L;
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+    private TridentTupleToKafkaMapper<K, V> mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties producerProperties = new Properties();
+
+    public TridentKafkaStateFactory<K, V> withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper<K, V> mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaStateFactory<K, V> withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public TridentKafkaStateFactory<K, V> withProducerProperties(Properties props) {
+        this.producerProperties = props;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        TridentKafkaState<K, V> state = new TridentKafkaState<>();
+        state.withKafkaTopicSelector(this.topicSelector)
+            .withTridentTupleToKafkaMapper(this.mapper);
+        state.prepare(producerProperties);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
new file mode 100644
index 0000000..19e3d33
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/TridentKafkaStateUpdater.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.storm.kafka.trident;
+
+import java.util.List;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class TridentKafkaStateUpdater<K, V> extends BaseStateUpdater<TridentKafkaState<K, V>> {
+
+    private static final long serialVersionUID = 3352659585225274402L;
+
+    @Override
+    public void updateState(TridentKafkaState<K, V> state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+    public final String keyFieldName;
+    public final String msgFieldName;
+
+    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+        this.keyFieldName = keyFieldName;
+        this.msgFieldName = msgFieldName;
+    }
+
+    @Override
+    public K getKeyFromTuple(TridentTuple tuple) {
+        return (K) tuple.getValueByField(keyFieldName);
+    }
+
+    @Override
+    public V getMessageFromTuple(TridentTuple tuple) {
+        return (V) tuple.getValueByField(msgFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
+    K getKeyFromTuple(TridentTuple tuple);
+    V getMessageFromTuple(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..607c996
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+    private static final long serialVersionUID = -1172454882072591493L;
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka-client/src/main/java/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
new file mode 100644
index 0000000..93b1040
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnit.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.admin.AdminUtils;
+import kafka.admin.RackAwareMode;
+import kafka.server.KafkaConfig;
+import kafka.server.KafkaServer;
+import kafka.utils.MockTime;
+import kafka.utils.TestUtils;
+import kafka.utils.ZKStringSerializer$;
+import kafka.utils.ZkUtils;
+import kafka.zk.EmbeddedZookeeper;
+import org.I0Itec.zkclient.ZkClient;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+
+import static org.apache.kafka.clients.CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG;
+import static org.apache.kafka.clients.producer.ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG;
+
+public class KafkaUnit {
+    private KafkaServer kafkaServer;
+    private EmbeddedZookeeper zkServer;
+    private ZkUtils zkUtils;
+    private KafkaProducer<String, String> producer;
+    private static final String ZK_HOST = "127.0.0.1";
+    private static final String KAFKA_HOST = "127.0.0.1";
+    private static final int KAFKA_PORT = 9092;
+
+    public KafkaUnit() {
+    }
+
+    public void setUp() throws IOException {
+        // setup ZK
+        zkServer = new EmbeddedZookeeper();
+        String zkConnect = ZK_HOST + ":" + zkServer.port();
+        ZkClient zkClient = new ZkClient(zkConnect, 30000, 30000, ZKStringSerializer$.MODULE$);
+        zkUtils = ZkUtils.apply(zkClient, false);
+
+        // setup Broker
+        Properties brokerProps = new Properties();
+        brokerProps.setProperty("zookeeper.connect", zkConnect);
+        brokerProps.setProperty("broker.id", "0");
+        brokerProps.setProperty("log.dirs", Files.createTempDirectory("kafka-").toAbsolutePath().toString());
+        brokerProps.setProperty("listeners", String.format("PLAINTEXT://%s:%d", KAFKA_HOST, KAFKA_PORT));
+        KafkaConfig config = new KafkaConfig(brokerProps);
+        MockTime mock = new MockTime();
+        kafkaServer = TestUtils.createServer(config, mock);
+
+        // setup default Producer
+        createProducer();
+    }
+
+    public void tearDown() {
+        closeProducer();
+        kafkaServer.shutdown();
+        zkUtils.close();
+        zkServer.shutdown();
+    }
+
+    public void createTopic(String topicName) {
+        AdminUtils.createTopic(zkUtils, topicName, 1, 1, new Properties(), RackAwareMode.Disabled$.MODULE$);
+    }
+
+    public int getKafkaPort() {
+        return KAFKA_PORT;
+    }
+
+    private void createProducer() {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+        producerProps.setProperty(KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        producerProps.setProperty(VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
+        producer = new KafkaProducer<>(producerProps);
+    }
+
+    public void createProducer(Serializer keySerializer, Serializer valueSerializer) {
+        Properties producerProps = new Properties();
+        producerProps.setProperty(BOOTSTRAP_SERVERS_CONFIG, KAFKA_HOST + ":" + KAFKA_PORT);
+        producer = new KafkaProducer<>(producerProps, keySerializer, valueSerializer);
+    }
+
+    public void sendMessage(ProducerRecord producerRecord) throws InterruptedException, ExecutionException, TimeoutException {
+        producer.send(producerRecord).get(10, TimeUnit.SECONDS);
+    }
+
+    private void closeProducer() {
+        producer.close();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
new file mode 100644
index 0000000..6e90c9d
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/KafkaUnitRule.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.junit.rules.ExternalResource;
+
+import java.io.IOException;
+
+
+public class KafkaUnitRule extends ExternalResource {
+
+    private final KafkaUnit kafkaUnit;
+
+    public KafkaUnitRule() {
+        this.kafkaUnit = new KafkaUnit();
+    }
+
+    @Override
+    public void before() throws IOException {
+        kafkaUnit.setUp();
+    }
+
+    @Override
+    public void after() {
+        kafkaUnit.tearDown();
+    }
+
+    public KafkaUnit getKafkaUnit() {
+        return this.kafkaUnit;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
new file mode 100644
index 0000000..8c8a945
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/bolt/KafkaBoltTest.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.bolt;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.Callback;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.storm.Testing;
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.testing.MkTupleParam;
+import org.apache.storm.tuple.Tuple;
+import org.junit.Test;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class KafkaBoltTest {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaBoltTest.class);
+    
+    @SuppressWarnings({ "unchecked", "serial" })
+    @Test
+    public void testSimple() {
+        final KafkaProducer<String, String> producer = mock(KafkaProducer.class);
+        when(producer.send((ProducerRecord<String,String>)any(), (Callback)any())).thenAnswer(new Answer<Object>() {
+            @Override
+            public Object answer(InvocationOnMock invocation) throws Throwable {
+                Callback c = (Callback)invocation.getArguments()[1];
+                c.onCompletion(null, null);
+                return null;
+            }
+        });
+        KafkaBolt<String, String> bolt = new KafkaBolt<String, String>() {
+            @Override
+            protected KafkaProducer<String, String> mkProducer(Properties props) {
+                return producer;
+            }
+        };
+        bolt.withTopicSelector("MY_TOPIC");
+        
+        OutputCollector collector = mock(OutputCollector.class);
+        TopologyContext context = mock(TopologyContext.class);
+        Map<String, Object> conf = new HashMap<>();
+        bolt.prepare(conf, context, collector);
+        MkTupleParam param = new MkTupleParam();
+        param.setFields("key", "message");
+        Tuple testTuple = Testing.testTuple(Arrays.asList("KEY", "VALUE"), param);
+        bolt.execute(testTuple);
+        verify(producer).send(argThat(new ArgumentMatcher<ProducerRecord<String, String>>() {
+            @Override
+            public boolean matches(Object argument) {
+                LOG.info("GOT {} ->", argument);
+                ProducerRecord<String, String> arg = (ProducerRecord<String, String>) argument;
+                LOG.info("  {} {} {}", arg.topic(), arg.key(), arg.value());
+                return "MY_TOPIC".equals(arg.topic()) &&
+                        "KEY".equals(arg.key()) &&
+                        "VALUE".equals(arg.value());
+            }
+        }), any(Callback.class));
+        verify(collector).ack(testTuple);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/e16fa19f/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
new file mode 100644
index 0000000..abc58f0
--- /dev/null
+++ b/external/storm-kafka-client/src/test/java/org/apache/storm/kafka/spout/ByTopicRecordTranslatorTest.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ *   or more contributor license agreements.  See the NOTICE file
+ *   distributed with this work for additional information
+ *   regarding copyright ownership.  The ASF licenses this file
+ *   to you under the Apache License, Version 2.0 (the
+ *   "License"); you may not use this file except in compliance
+ *   with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ */
+package org.apache.storm.kafka.spout;
+
+import static org.junit.Assert.assertEquals;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+
+import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
+import org.junit.Test;
+
+public class ByTopicRecordTranslatorTest {
+    public static Func<ConsumerRecord<String, String>, List<Object>> JUST_KEY_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> JUST_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.value());
+        }
+    };
+    
+    public static Func<ConsumerRecord<String, String>, List<Object>> KEY_VALUE_FUNC = new Func<ConsumerRecord<String, String>, List<Object>>() {
+        @Override
+        public List<Object> apply(ConsumerRecord<String, String> record) {
+            return new Values(record.key(), record.value());
+        }
+    };
+    
+    @Test
+    public void testBasic() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("TOPIC 1", JUST_VALUE_FUNC, new Fields("value"), "value-stream");
+        trans.forTopic("TOPIC 2", KEY_VALUE_FUNC, new Fields("key", "value"), "key-value-stream");
+        HashSet<String> expectedStreams = new HashSet<>();
+        expectedStreams.add("default");
+        expectedStreams.add("value-stream");
+        expectedStreams.add("key-value-stream");
+        assertEquals(expectedStreams, new HashSet<>(trans.streams()));
+
+        ConsumerRecord<String, String> cr1 = new ConsumerRecord<>("TOPIC OTHER", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key"), trans.getFieldsFor("default"));
+        assertEquals(Arrays.asList("THE KEY"), trans.apply(cr1));
+        
+        ConsumerRecord<String, String> cr2 = new ConsumerRecord<>("TOPIC 1", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("value"), trans.getFieldsFor("value-stream"));
+        assertEquals(Arrays.asList("THE VALUE"), trans.apply(cr2));
+        
+        ConsumerRecord<String, String> cr3 = new ConsumerRecord<>("TOPIC 2", 100, 100, "THE KEY", "THE VALUE");
+        assertEquals(new Fields("key", "value"), trans.getFieldsFor("key-value-stream"));
+        assertEquals(Arrays.asList("THE KEY", "THE VALUE"), trans.apply(cr3));
+    }
+    
+    @Test(expected = IllegalArgumentException.class)
+    public void testFieldCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"));
+    }
+    
+    @Test(expected = IllegalStateException.class)
+    public void testTopicCollision() {
+        ByTopicRecordTranslator<String, String> trans = 
+                new ByTopicRecordTranslator<>(JUST_KEY_FUNC, new Fields("key"));
+        trans.forTopic("foo", JUST_VALUE_FUNC, new Fields("value"), "foo1");
+        trans.forTopic("foo", KEY_VALUE_FUNC, new Fields("key", "value"), "foo2");
+    }
+
+}