You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:34 UTC

[38/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
new file mode 100644
index 0000000..6eddaf5
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.CombinedMetric;
+import org.apache.storm.metric.api.MeanReducer;
+import org.apache.storm.metric.api.ReducedMetric;
+import org.apache.storm.task.TopologyContext;
+import com.google.common.collect.ImmutableMap;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import kafka.message.MessageAndOffset;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.*;
+import org.apache.storm.kafka.TopicOffsetOutOfRangeException;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.spout.IOpaquePartitionedTridentSpout;
+import org.apache.storm.trident.spout.IPartitionedTridentSpout;
+import org.apache.storm.trident.topology.TransactionAttempt;
+
+import java.util.*;
+
+public class TridentKafkaEmitter {
+
+    public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
+
+    private DynamicPartitionConnections _connections;
+    private String _topologyName;
+    private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
+    private ReducedMetric _kafkaMeanFetchLatencyMetric;
+    private CombinedMetric _kafkaMaxFetchLatencyMetric;
+    private TridentKafkaConfig _config;
+    private String _topologyInstanceId;
+
+    public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
+        _config = config;
+        _topologyInstanceId = topologyInstanceId;
+        _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
+        _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+        _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
+        context.registerMetric("kafkaOffset", _kafkaOffsetMetric, _config.metricsTimeBucketSizeInSecs);
+        _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), _config.metricsTimeBucketSizeInSecs);
+        _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), _config.metricsTimeBucketSizeInSecs);
+    }
+
+
+    private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+        SimpleConsumer consumer = _connections.register(partition);
+        Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
+        _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+        return ret;
+    }
+
+    private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+        try {
+            return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
+        } catch (FailedFetchException e) {
+            LOG.warn("Failed to fetch from partition " + partition);
+            if (lastMeta == null) {
+                return null;
+            } else {
+                Map ret = new HashMap();
+                ret.put("offset", lastMeta.get("nextOffset"));
+                ret.put("nextOffset", lastMeta.get("nextOffset"));
+                ret.put("partition", partition.partition);
+                ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+                ret.put("topic", partition.topic);
+                ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+                return ret;
+            }
+        }
+    }
+
+    private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
+        long offset;
+        if (lastMeta != null) {
+            String lastInstanceId = null;
+            Map lastTopoMeta = (Map) lastMeta.get("topology");
+            if (lastTopoMeta != null) {
+                lastInstanceId = (String) lastTopoMeta.get("id");
+            }
+            if (_config.ignoreZkOffsets && !_topologyInstanceId.equals(lastInstanceId)) {
+                offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config.startOffsetTime);
+            } else {
+                offset = (Long) lastMeta.get("nextOffset");
+            }
+        } else {
+            offset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, _config);
+        }
+
+        ByteBufferMessageSet msgs = null;
+        try {
+            msgs = fetchMessages(consumer, partition, offset);
+        } catch (TopicOffsetOutOfRangeException e) {
+            long newOffset = KafkaUtils.getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+            LOG.warn("OffsetOutOfRange: Updating offset from offset = " + offset + " to offset = " + newOffset);
+            offset = newOffset;
+            msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        }
+
+        long endoffset = offset;
+        for (MessageAndOffset msg : msgs) {
+            emit(collector, msg.message(), partition, msg.offset());
+            endoffset = msg.nextOffset();
+        }
+        Map newMeta = new HashMap();
+        newMeta.put("offset", offset);
+        newMeta.put("nextOffset", endoffset);
+        newMeta.put("instanceId", _topologyInstanceId);
+        newMeta.put("partition", partition.partition);
+        newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+        newMeta.put("topic", partition.topic);
+        newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+        return newMeta;
+    }
+
+    private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
+        long start = System.nanoTime();
+        ByteBufferMessageSet msgs = null;
+        msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
+        long end = System.nanoTime();
+        long millis = (end - start) / 1000000;
+        _kafkaMeanFetchLatencyMetric.update(millis);
+        _kafkaMaxFetchLatencyMetric.update(millis);
+        return msgs;
+    }
+
+    /**
+     * re-emit the batch described by the meta data provided
+     *
+     * @param attempt
+     * @param collector
+     * @param partition
+     * @param meta
+     */
+    private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+        LOG.info("re-emitting batch, attempt " + attempt);
+        String instanceId = (String) meta.get("instanceId");
+        if (!_config.ignoreZkOffsets || instanceId.equals(_topologyInstanceId)) {
+            SimpleConsumer consumer = _connections.register(partition);
+            long offset = (Long) meta.get("offset");
+            long nextOffset = (Long) meta.get("nextOffset");
+            ByteBufferMessageSet msgs = null;
+            msgs = fetchMessages(consumer, partition, offset);
+
+            if(msgs != null) {
+                for (MessageAndOffset msg : msgs) {
+                    if (offset == nextOffset) {
+                        break;
+                    }
+                    if (offset > nextOffset) {
+                        throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+                    }
+                    emit(collector, msg.message(), partition, msg.offset());
+                    offset = msg.nextOffset();
+                }
+            }
+        }
+    }
+
+    private void emit(TridentCollector collector, Message msg, Partition partition, long offset) {
+        Iterable<List<Object>> values;
+        if (_config.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
+            values = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _config.scheme, msg, partition, offset);
+        } else {
+            values = KafkaUtils.generateTuples(_config, msg, partition.topic);
+        }
+
+        if (values != null) {
+            for (List<Object> value : values) {
+                collector.emit(value);
+            }
+        }
+    }
+
+    private void clear() {
+        _connections.clear();
+    }
+
+    private List<Partition> orderPartitions(List<GlobalPartitionInformation> partitions) {
+        List<Partition> part = new ArrayList<Partition>();
+        for (GlobalPartitionInformation globalPartitionInformation : partitions)
+            part.addAll(globalPartitionInformation.getOrderedPartitions());
+        return part;
+    }
+
+    private void refresh(List<Partition> list) {
+        _connections.clear();
+        _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
+    }
+
+
+    public IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map> asOpaqueEmitter() {
+
+        return new IOpaquePartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+
+            /**
+             * Emit a batch of tuples for a partition/transaction.
+             *
+             * Return the metadata describing this batch that will be used as lastPartitionMeta
+             * for defining the parameters of the next batch.
+             */
+            @Override
+            public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            @Override
+            public void refreshPartitions(List<Partition> partitions) {
+                refresh(partitions);
+            }
+
+            @Override
+            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
+                return orderPartitions(partitionInformation);
+            }
+
+            @Override
+            public void close() {
+                clear();
+            }
+        };
+    }
+
+    public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
+        return new IPartitionedTridentSpout.Emitter<List<GlobalPartitionInformation>, Partition, Map>() {
+
+            /**
+             * Emit a batch of tuples for a partition/transaction that's never been emitted before.
+             * Return the metadata that can be used to reconstruct this partition/batch in the future.
+             */
+            @Override
+            public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            /**
+             * Emit a batch of tuples for a partition/transaction that has been emitted before, using
+             * the metadata created when it was first emitted.
+             */
+            @Override
+            public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+                reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+            }
+
+            /**
+             * This method is called when this task is responsible for a new set of partitions. Should be used
+             * to manage things like connections to brokers.
+             */
+            @Override
+            public void refreshPartitions(List<Partition> partitions) {
+                refresh(partitions);
+            }
+
+            @Override
+            public List<Partition> getOrderedPartitions(List<GlobalPartitionInformation> partitionInformation) {
+                return orderPartitions(partitionInformation);
+            }
+
+            @Override
+            public void close() {
+                clear();
+            }
+        };
+
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
new file mode 100644
index 0000000..5741dc7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaState.java
@@ -0,0 +1,102 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.OutputCollector;
+import org.apache.storm.topology.FailedException;
+import org.apache.commons.lang.Validate;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class TridentKafkaState implements State {
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaState.class);
+
+    private KafkaProducer producer;
+    private OutputCollector collector;
+
+    private TridentTupleToKafkaMapper mapper;
+    private KafkaTopicSelector topicSelector;
+
+    public TridentKafkaState withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaState withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    @Override
+    public void beginCommit(Long txid) {
+        LOG.debug("beginCommit is Noop.");
+    }
+
+    @Override
+    public void commit(Long txid) {
+        LOG.debug("commit is Noop.");
+    }
+
+    public void prepare(Properties options) {
+        Validate.notNull(mapper, "mapper can not be null");
+        Validate.notNull(topicSelector, "topicSelector can not be null");
+        producer = new KafkaProducer(options);
+    }
+
+    public void updateState(List<TridentTuple> tuples, TridentCollector collector) {
+        String topic = null;
+        for (TridentTuple tuple : tuples) {
+            try {
+                topic = topicSelector.getTopic(tuple);
+
+                if(topic != null) {
+                    Future<RecordMetadata> result = producer.send(new ProducerRecord(topic,
+                            mapper.getKeyFromTuple(tuple), mapper.getMessageFromTuple(tuple)));
+                    try {
+                        result.get();
+                    } catch (ExecutionException e) {
+                        String errorMsg = "Could not retrieve result for message with key = "
+                                + mapper.getKeyFromTuple(tuple) + " from topic = " + topic;
+                        LOG.error(errorMsg, e);
+                        throw new FailedException(errorMsg, e);
+                    }
+                } else {
+                    LOG.warn("skipping key = " + mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
+                }
+            } catch (Exception ex) {
+                String errorMsg = "Could not send message with key = " + mapper.getKeyFromTuple(tuple)
+                        + " to topic = " + topic;
+                LOG.warn(errorMsg, ex);
+                throw new FailedException(errorMsg, ex);
+            }
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
new file mode 100644
index 0000000..f564510
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaStateFactory.java
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.task.IMetricsContext;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.mapper.TridentTupleToKafkaMapper;
+import org.apache.storm.kafka.trident.selector.KafkaTopicSelector;
+import org.apache.storm.trident.state.State;
+import org.apache.storm.trident.state.StateFactory;
+
+import java.util.Map;
+import java.util.Properties;
+
+public class TridentKafkaStateFactory implements StateFactory {
+
+    private static final Logger LOG = LoggerFactory.getLogger(TridentKafkaStateFactory.class);
+
+    private TridentTupleToKafkaMapper mapper;
+    private KafkaTopicSelector topicSelector;
+    private Properties producerProperties = new Properties();
+
+    public TridentKafkaStateFactory withTridentTupleToKafkaMapper(TridentTupleToKafkaMapper mapper) {
+        this.mapper = mapper;
+        return this;
+    }
+
+    public TridentKafkaStateFactory withKafkaTopicSelector(KafkaTopicSelector selector) {
+        this.topicSelector = selector;
+        return this;
+    }
+
+    public TridentKafkaStateFactory withProducerProperties(Properties props) {
+        this.producerProperties = props;
+        return this;
+    }
+
+    @Override
+    public State makeState(Map conf, IMetricsContext metrics, int partitionIndex, int numPartitions) {
+        LOG.info("makeState(partitonIndex={}, numpartitions={}", partitionIndex, numPartitions);
+        TridentKafkaState state = new TridentKafkaState()
+                .withKafkaTopicSelector(this.topicSelector)
+                .withTridentTupleToKafkaMapper(this.mapper);
+        state.prepare(producerProperties);
+        return state;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
new file mode 100644
index 0000000..7a905ab
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaUpdater.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.util.List;
+
+public class TridentKafkaUpdater extends BaseStateUpdater<TridentKafkaState> {
+    @Override
+    public void updateState(TridentKafkaState state, List<TridentTuple> tuples, TridentCollector collector) {
+        state.updateState(tuples, collector);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
new file mode 100644
index 0000000..d26c341
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/ZkBrokerReader.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.DynamicBrokersReader;
+import org.apache.storm.kafka.ZkHosts;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class ZkBrokerReader implements IBrokerReader {
+
+	public static final Logger LOG = LoggerFactory.getLogger(ZkBrokerReader.class);
+
+	List<GlobalPartitionInformation> cachedBrokers = new ArrayList<GlobalPartitionInformation>();
+	DynamicBrokersReader reader;
+	long lastRefreshTimeMs;
+
+
+	long refreshMillis;
+
+	public ZkBrokerReader(Map conf, String topic, ZkHosts hosts) {
+		try {
+			reader = new DynamicBrokersReader(conf, hosts.brokerZkStr, hosts.brokerZkPath, topic);
+			cachedBrokers = reader.getBrokerInfo();
+			lastRefreshTimeMs = System.currentTimeMillis();
+			refreshMillis = hosts.refreshFreqSecs * 1000L;
+		} catch (java.net.SocketTimeoutException e) {
+			LOG.warn("Failed to update brokers", e);
+		}
+
+	}
+
+	private void refresh() {
+		long currTime = System.currentTimeMillis();
+		if (currTime > lastRefreshTimeMs + refreshMillis) {
+			try {
+				LOG.info("brokers need refreshing because " + refreshMillis + "ms have expired");
+				cachedBrokers = reader.getBrokerInfo();
+				lastRefreshTimeMs = currTime;
+			} catch (java.net.SocketTimeoutException e) {
+				LOG.warn("Failed to update brokers", e);
+			}
+		}
+	}
+	@Override
+	public GlobalPartitionInformation getBrokerForTopic(String topic) {
+		refresh();
+        for(GlobalPartitionInformation partitionInformation : cachedBrokers) {
+            if (partitionInformation.topic.equals(topic)) return partitionInformation;
+        }
+		return null;
+	}
+
+	@Override
+	public List<GlobalPartitionInformation> getAllBrokers() {
+		refresh();
+		return cachedBrokers;
+	}
+
+	@Override
+	public void close() {
+		reader.close();
+	}
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
new file mode 100644
index 0000000..2d04971
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/FieldNameBasedTupleToKafkaMapper.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class FieldNameBasedTupleToKafkaMapper<K, V> implements TridentTupleToKafkaMapper {
+
+    public final String keyFieldName;
+    public final String msgFieldName;
+
+    public FieldNameBasedTupleToKafkaMapper(String keyFieldName, String msgFieldName) {
+        this.keyFieldName = keyFieldName;
+        this.msgFieldName = msgFieldName;
+    }
+
+    @Override
+    public K getKeyFromTuple(TridentTuple tuple) {
+        return (K) tuple.getValueByField(keyFieldName);
+    }
+
+    @Override
+    public V getMessageFromTuple(TridentTuple tuple) {
+        return (V) tuple.getValueByField(msgFieldName);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
new file mode 100644
index 0000000..28c6c89
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/mapper/TridentTupleToKafkaMapper.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.mapper;
+
+import org.apache.storm.tuple.Tuple;
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface TridentTupleToKafkaMapper<K,V>  extends Serializable {
+    K getKeyFromTuple(TridentTuple tuple);
+    V getMessageFromTuple(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
new file mode 100644
index 0000000..7ae49a3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/DefaultTopicSelector.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+public class DefaultTopicSelector implements KafkaTopicSelector {
+
+    private final String topicName;
+
+    public DefaultTopicSelector(final String topicName) {
+        this.topicName = topicName;
+    }
+
+    @Override
+    public String getTopic(TridentTuple tuple) {
+        return topicName;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
new file mode 100644
index 0000000..012a6c7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/selector/KafkaTopicSelector.java
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka.trident.selector;
+
+import org.apache.storm.trident.tuple.TridentTuple;
+
+import java.io.Serializable;
+
+public interface KafkaTopicSelector extends Serializable {
+    String getTopic(TridentTuple tuple);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
deleted file mode 100644
index 513ab22..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/Broker.java
+++ /dev/null
@@ -1,86 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import com.google.common.base.Objects;
-
-import java.io.Serializable;
-
-public class Broker implements Serializable, Comparable<Broker> {
-    public String host;
-    public int port;
-
-    // for kryo compatibility
-    private Broker() {
-	
-    }
-    
-    public Broker(String host, int port) {
-        this.host = host;
-        this.port = port;
-    }
-
-    public Broker(String host) {
-        this(host, 9092);
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(host, port);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final Broker other = (Broker) obj;
-        return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
-    }
-
-    @Override
-    public String toString() {
-        return host + ":" + port;
-    }
-
-    public static Broker fromString(String host) {
-        Broker hp;
-        String[] spec = host.split(":");
-        if (spec.length == 1) {
-            hp = new Broker(spec[0]);
-        } else if (spec.length == 2) {
-            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
-        } else {
-            throw new IllegalArgumentException("Invalid host specification: " + host);
-        }
-        return hp;
-    }
-
-
-    @Override
-    public int compareTo(Broker o) {
-        if (this.host.equals(o.host)) {
-            return this.port - o.port;
-        } else {
-            return this.host.compareTo(o.host);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
deleted file mode 100644
index 1a06fc5..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.io.Serializable;
-
-
-public interface BrokerHosts extends Serializable {
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
deleted file mode 100644
index 1a7238e..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ByteBufferSerializer.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.utils.Utils;
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.nio.ByteBuffer;
-import java.util.Map;
-
-public class ByteBufferSerializer implements Serializer<ByteBuffer> {
-  @Override
-  public void configure(Map<String, ?> map, boolean b) {
-
-  }
-
-  @Override
-  public void close() {
-
-  }
-
-  @Override
-  public byte[] serialize(String s, ByteBuffer b) {
-    return Utils.toByteArray(b);
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
deleted file mode 100644
index d0f6724..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.utils.Utils;
-import com.google.common.base.Preconditions;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.GlobalPartitionInformation;
-
-import java.io.UnsupportedEncodingException;
-import java.net.SocketTimeoutException;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-
-public class DynamicBrokersReader {
-
-    public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
-
-    private CuratorFramework _curator;
-    private String _zkPath;
-    private String _topic;
-    private Boolean _isWildcardTopic;
-
-    public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
-        // Check required parameters
-        Preconditions.checkNotNull(conf, "conf cannot be null");
-
-        validateConfig(conf);
-
-        Preconditions.checkNotNull(zkStr,"zkString cannot be null");
-        Preconditions.checkNotNull(zkPath, "zkPath cannot be null");
-        Preconditions.checkNotNull(topic, "topic cannot be null");
-
-        _zkPath = zkPath;
-        _topic = topic;
-        _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false);
-        try {
-            _curator = CuratorFrameworkFactory.newClient(
-                    zkStr,
-                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
-                    new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                            Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-            _curator.start();
-        } catch (Exception ex) {
-            LOG.error("Couldn't connect to zookeeper", ex);
-            throw new RuntimeException(ex);
-        }
-    }
-
-    /**
-     * Get all partitions with their current leaders
-     */
-    public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
-      List<String> topics =  getTopics();
-      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();
-
-      for (String topic : topics) {
-          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
-          try {
-              int numPartitionsForTopic = getNumPartitions(topic);
-              String brokerInfoPath = brokerPath();
-              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
-                  int leader = getLeaderFor(topic,partition);
-                  String path = brokerInfoPath + "/" + leader;
-                  try {
-                      byte[] brokerData = _curator.getData().forPath(path);
-                      Broker hp = getBrokerHost(brokerData);
-                      globalPartitionInformation.addPartition(partition, hp);
-                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
-                      LOG.error("Node {} does not exist ", path);
-                  }
-              }
-          } catch (SocketTimeoutException e) {
-              throw e;
-          } catch (Exception e) {
-              throw new RuntimeException(e);
-          }
-          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
-          partitions.add(globalPartitionInformation);
-      }
-        return partitions;
-    }
-
-    private int getNumPartitions(String topic) {
-        try {
-            String topicBrokersPath = partitionPath(topic);
-            List<String> children = _curator.getChildren().forPath(topicBrokersPath);
-            return children.size();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private List<String> getTopics() {
-        List<String> topics = new ArrayList<String>();
-        if (!_isWildcardTopic) {
-            topics.add(_topic);
-            return topics;
-        } else {
-            try {
-                List<String> children = _curator.getChildren().forPath(topicsPath());
-                for (String t : children) {
-                    if (t.matches(_topic)) {
-                        LOG.info(String.format("Found matching topic %s", t));
-                        topics.add(t);
-                    }
-                }
-                return topics;
-            } catch (Exception e) {
-                throw new RuntimeException(e);
-            }
-        }
-    }
-
-    public String topicsPath () {
-        return _zkPath + "/topics";
-    }
-    public String partitionPath(String topic) {
-        return topicsPath() + "/" + topic + "/partitions";
-    }
-
-    public String brokerPath() {
-        return _zkPath + "/ids";
-    }
-
-
-
-    /**
-     * get /brokers/topics/distributedTopic/partitions/1/state
-     * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
-     * @param topic
-     * @param partition
-     * @return
-     */
-    private int getLeaderFor(String topic, long partition) {
-        try {
-            String topicBrokersPath = partitionPath(topic);
-            byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
-            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
-            Integer leader = ((Number) value.get("leader")).intValue();
-            if (leader == -1) {
-                throw new RuntimeException("No leader found for partition " + partition);
-            }
-            return leader;
-        } catch (RuntimeException e) {
-            throw e;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() {
-        _curator.close();
-    }
-
-    /**
-     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
-     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
-     *
-     * @param contents
-     * @return
-     */
-    private Broker getBrokerHost(byte[] contents) {
-        try {
-            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
-            String host = (String) value.get("host");
-            Integer port = ((Long) value.get("port")).intValue();
-            return new Broker(host, port);
-        } catch (UnsupportedEncodingException e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    /**
-     * Validate required parameters in the input configuration Map
-     * @param conf
-     */
-    private void validateConfig(final Map conf) {
-        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
-        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
-        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES);
-        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL),
-                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
deleted file mode 100644
index e237a7a..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import kafka.javaapi.consumer.SimpleConsumer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.trident.IBrokerReader;
-
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-
-public class DynamicPartitionConnections {
-
-    public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
-
-    static class ConnectionInfo {
-        SimpleConsumer consumer;
-        Set<String> partitions = new HashSet<String>();
-
-        public ConnectionInfo(SimpleConsumer consumer) {
-            this.consumer = consumer;
-        }
-    }
-
-    Map<Broker, ConnectionInfo> _connections = new HashMap();
-    KafkaConfig _config;
-    IBrokerReader _reader;
-
-    public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
-        _config = config;
-        _reader = brokerReader;
-    }
-
-    public SimpleConsumer register(Partition partition) {
-        Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition);
-        return register(broker, partition.topic, partition.partition);
-    }
-
-    public SimpleConsumer register(Broker host, String topic, int partition) {
-        if (!_connections.containsKey(host)) {
-            _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
-        }
-        ConnectionInfo info = _connections.get(host);
-        info.partitions.add(getHashKey(topic,partition));
-        return info.consumer;
-    }
-
-    public SimpleConsumer getConnection(Partition partition) {
-        ConnectionInfo info = _connections.get(partition.host);
-        if (info != null) {
-            return info.consumer;
-        }
-        return null;
-    }
-
-    public void unregister(Broker port, String topic, int partition) {
-        ConnectionInfo info = _connections.get(port);
-        info.partitions.remove(getHashKey(topic,partition));
-        if (info.partitions.isEmpty()) {
-            info.consumer.close();
-            _connections.remove(port);
-        }
-    }
-
-    public void unregister(Partition partition) {
-        unregister(partition.host, partition.topic, partition.partition);
-    }
-
-    public void clear() {
-        for (ConnectionInfo info : _connections.values()) {
-            info.consumer.close();
-        }
-        _connections.clear();
-    }
-
-    private String getHashKey(String topic, int partition) {
-        return topic + "_" + partition;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
deleted file mode 100644
index 5664f12..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/ExponentialBackoffMsgRetryManager.java
+++ /dev/null
@@ -1,184 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.PriorityQueue;
-import java.util.Queue;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
-
-    private final long retryInitialDelayMs;
-    private final double retryDelayMultiplier;
-    private final long retryDelayMaxMs;
-
-    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
-    private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
-
-    public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
-        this.retryInitialDelayMs = retryInitialDelayMs;
-        this.retryDelayMultiplier = retryDelayMultiplier;
-        this.retryDelayMaxMs = retryDelayMaxMs;
-    }
-
-    @Override
-    public void failed(Long offset) {
-        MessageRetryRecord oldRecord = this.records.get(offset);
-        MessageRetryRecord newRecord = oldRecord == null ?
-                                       new MessageRetryRecord(offset) :
-                                       oldRecord.createNextRetryRecord();
-        this.records.put(offset, newRecord);
-        this.waiting.add(newRecord);
-    }
-
-    @Override
-    public void acked(Long offset) {
-        MessageRetryRecord record = this.records.remove(offset);
-        if (record != null) {
-            this.waiting.remove(record);
-        }
-    }
-
-    @Override
-    public void retryStarted(Long offset) {
-        MessageRetryRecord record = this.records.get(offset);
-        if (record == null || !this.waiting.contains(record)) {
-            throw new IllegalStateException("cannot retry a message that has not failed");
-        } else {
-            this.waiting.remove(record);
-        }
-    }
-
-    @Override
-    public Long nextFailedMessageToRetry() {
-        if (this.waiting.size() > 0) {
-            MessageRetryRecord first = this.waiting.peek();
-            if (System.currentTimeMillis() >= first.retryTimeUTC) {
-                if (this.records.containsKey(first.offset)) {
-                    return first.offset;
-                } else {
-                    // defensive programming - should be impossible
-                    this.waiting.remove(first);
-                    return nextFailedMessageToRetry();
-                }
-            }
-        }
-        return null;
-    }
-
-    @Override
-    public boolean shouldRetryMsg(Long offset) {
-        MessageRetryRecord record = this.records.get(offset);
-        return record != null &&
-                this.waiting.contains(record) &&
-                System.currentTimeMillis() >= record.retryTimeUTC;
-    }
-
-    @Override
-    public Set<Long> clearInvalidMessages(Long kafkaOffset) {
-        Set<Long> invalidOffsets = new HashSet<Long>(); 
-        for(Long offset : records.keySet()){
-            if(offset < kafkaOffset){
-                MessageRetryRecord record = this.records.remove(offset);
-                if (record != null) {
-                    this.waiting.remove(record);
-                    invalidOffsets.add(offset);
-                }
-            }
-        }
-        return invalidOffsets;
-    }
-
-    /**
-     * A MessageRetryRecord holds the data of how many times a message has
-     * failed and been retried, and when the last failure occurred.  It can
-     * determine whether it is ready to be retried by employing an exponential
-     * back-off calculation using config values stored in SpoutConfig:
-     * <ul>
-     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
-     *  <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
-     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
-     *                        delay for this amount of time every time)
-     *  </li>
-     * </ul>
-     */
-    private class MessageRetryRecord {
-        private final long offset;
-        private final int retryNum;
-        private final long retryTimeUTC;
-
-        public MessageRetryRecord(long offset) {
-            this(offset, 1);
-        }
-
-        private MessageRetryRecord(long offset, int retryNum) {
-            this.offset = offset;
-            this.retryNum = retryNum;
-            this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
-        }
-
-        /**
-         * Create a MessageRetryRecord for the next retry that should occur after this one.
-         * @return MessageRetryRecord with the next retry time, or null to indicate that another
-         *         retry should not be performed.  The latter case can happen if we are about to
-         *         run into the backtype.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
-         *         configuration.
-         */
-        public MessageRetryRecord createNextRetryRecord() {
-            return new MessageRetryRecord(this.offset, this.retryNum + 1);
-        }
-
-        private long calculateRetryDelay() {
-            double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
-            double delay = retryInitialDelayMs * delayMultiplier;
-            Long maxLong = Long.MAX_VALUE;
-            long delayThisRetryMs = delay >= maxLong.doubleValue()
-                                    ?  maxLong
-                                    : (long) delay;
-            return Math.min(delayThisRetryMs, retryDelayMaxMs);
-        }
-
-        @Override
-        public boolean equals(Object other) {
-            return (other instanceof MessageRetryRecord
-                    && this.offset == ((MessageRetryRecord) other).offset);
-        }
-
-        @Override
-        public int hashCode() {
-            return Long.valueOf(this.offset).hashCode();
-        }
-    }
-
-    private static class RetryTimeComparator implements Comparator<MessageRetryRecord> {
-
-        @Override
-        public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
-            return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
-        }
-
-        @Override
-        public boolean equals(Object obj) {
-            return false;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
deleted file mode 100644
index 011240e..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public class FailedFetchException extends RuntimeException {
-
-    public FailedFetchException(String message) {
-        super(message);
-    }
-
-    public FailedFetchException(Exception e) {
-        super(e);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
deleted file mode 100644
index 30c9a24..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/FailedMsgRetryManager.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import java.util.Set;
-
-public interface FailedMsgRetryManager {
-    public void failed(Long offset);
-    public void acked(Long offset);
-    public void retryStarted(Long offset);
-    public Long nextFailedMessageToRetry();
-    public boolean shouldRetryMsg(Long offset);
-    public Set<Long> clearInvalidMessages(Long kafkaOffset);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
deleted file mode 100644
index 07cbd26..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/IntSerializer.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import org.apache.kafka.common.serialization.Serializer;
-
-import java.nio.ByteBuffer;
-import java.nio.IntBuffer;
-import java.util.Map;
-
-public class IntSerializer implements Serializer<Integer> {
-  @Override
-  public void configure(Map<String, ?> map, boolean b) {
-  }
-
-  @Override
-  public byte[] serialize(String topic, Integer val) {
-    byte[] r = new byte[4];
-    IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
-    b.put(val);
-    return r;
-  }
-
-  @Override
-  public void close() {
-  }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
deleted file mode 100644
index 49c7526..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.spout.MultiScheme;
-import backtype.storm.spout.RawMultiScheme;
-
-import java.io.Serializable;
-
-public class KafkaConfig implements Serializable {
-    private static final long serialVersionUID = 5276718734571623855L;
-    
-    public final BrokerHosts hosts;
-    public final String topic;
-    public final String clientId;
-
-    public int fetchSizeBytes = 1024 * 1024;
-    public int socketTimeoutMs = 10000;
-    public int fetchMaxWait = 10000;
-    public int bufferSizeBytes = 1024 * 1024;
-    public MultiScheme scheme = new RawMultiScheme();
-    public boolean ignoreZkOffsets = false;
-    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
-    public long maxOffsetBehind = Long.MAX_VALUE;
-    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
-    public int metricsTimeBucketSizeInSecs = 60;
-
-    public KafkaConfig(BrokerHosts hosts, String topic) {
-        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
-    }
-
-    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
-        this.hosts = hosts;
-        this.topic = topic;
-        this.clientId = clientId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
deleted file mode 100644
index 634af85..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-public enum KafkaError {
-    NO_ERROR,
-    OFFSET_OUT_OF_RANGE,
-    INVALID_MESSAGE,
-    UNKNOWN_TOPIC_OR_PARTITION,
-    INVALID_FETCH_SIZE,
-    LEADER_NOT_AVAILABLE,
-    NOT_LEADER_FOR_PARTITION,
-    REQUEST_TIMED_OUT,
-    BROKER_NOT_AVAILABLE,
-    REPLICA_NOT_AVAILABLE,
-    MESSAGE_SIZE_TOO_LARGE,
-    STALE_CONTROLLER_EPOCH,
-    OFFSET_METADATA_TOO_LARGE,
-    UNKNOWN;
-
-    public static KafkaError getError(int errorCode) {
-        if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
-            return UNKNOWN;
-        } else {
-            return values()[errorCode];
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
deleted file mode 100644
index 8169014..0000000
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package storm.kafka;
-
-import backtype.storm.Config;
-import backtype.storm.metric.api.IMetric;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.topology.base.BaseRichSpout;
-import com.google.common.base.Strings;
-import kafka.message.Message;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import storm.kafka.PartitionManager.KafkaMessageId;
-
-import java.util.*;
-
-// TODO: need to add blacklisting
-// TODO: need to make a best effort to not re-emit messages if don't have to
-public class KafkaSpout extends BaseRichSpout {
-    static enum EmitState {
-        EMITTED_MORE_LEFT,
-        EMITTED_END,
-        NO_EMITTED
-    }
-
-    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
-
-    SpoutConfig _spoutConfig;
-    SpoutOutputCollector _collector;
-    PartitionCoordinator _coordinator;
-    DynamicPartitionConnections _connections;
-    ZkState _state;
-
-    long _lastUpdateMs = 0;
-
-    int _currPartitionIndex = 0;
-
-    public KafkaSpout(SpoutConfig spoutConf) {
-        _spoutConfig = spoutConf;
-    }
-
-    @Override
-    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
-        _collector = collector;
-        String topologyInstanceId = context.getStormId();
-        Map stateConf = new HashMap(conf);
-        List<String> zkServers = _spoutConfig.zkServers;
-        if (zkServers == null) {
-            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
-        }
-        Integer zkPort = _spoutConfig.zkPort;
-        if (zkPort == null) {
-            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
-        }
-        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
-        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
-        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
-        _state = new ZkState(stateConf);
-
-        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
-
-        // using TransactionalState like this is a hack
-        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
-        if (_spoutConfig.hosts instanceof StaticHosts) {
-            _coordinator = new StaticCoordinator(_connections, conf,
-                    _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
-        } else {
-            _coordinator = new ZkCoordinator(_connections, conf,
-                    _spoutConfig, _state, context.getThisTaskIndex(),
-                    totalTasks, topologyInstanceId);
-        }
-
-        context.registerMetric("kafkaOffset", new IMetric() {
-            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
-
-            @Override
-            public Object getValueAndReset() {
-                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
-                Set<Partition> latestPartitions = new HashSet();
-                for (PartitionManager pm : pms) {
-                    latestPartitions.add(pm.getPartition());
-                }
-                _kafkaOffsetMetric.refreshPartitions(latestPartitions);
-                for (PartitionManager pm : pms) {
-                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
-                }
-                return _kafkaOffsetMetric.getValueAndReset();
-            }
-        }, _spoutConfig.metricsTimeBucketSizeInSecs);
-
-        context.registerMetric("kafkaPartition", new IMetric() {
-            @Override
-            public Object getValueAndReset() {
-                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
-                Map concatMetricsDataMaps = new HashMap();
-                for (PartitionManager pm : pms) {
-                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
-                }
-                return concatMetricsDataMaps;
-            }
-        }, _spoutConfig.metricsTimeBucketSizeInSecs);
-    }
-
-    @Override
-    public void close() {
-        _state.close();
-    }
-
-    @Override
-    public void nextTuple() {
-        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
-        for (int i = 0; i < managers.size(); i++) {
-
-            try {
-                // in case the number of managers decreased
-                _currPartitionIndex = _currPartitionIndex % managers.size();
-                EmitState state = managers.get(_currPartitionIndex).next(_collector);
-                if (state != EmitState.EMITTED_MORE_LEFT) {
-                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
-                }
-                if (state != EmitState.NO_EMITTED) {
-                    break;
-                }
-            } catch (FailedFetchException e) {
-                LOG.warn("Fetch failed", e);
-                _coordinator.refresh();
-            }
-        }
-
-        long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
-
-        /*
-             As far as the System.currentTimeMillis() is dependent on System clock,
-             additional check on negative value of diffWithNow in case of external changes.
-         */
-        if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
-            commit();
-        }
-    }
-
-    @Override
-    public void ack(Object msgId) {
-        KafkaMessageId id = (KafkaMessageId) msgId;
-        PartitionManager m = _coordinator.getManager(id.partition);
-        if (m != null) {
-            m.ack(id.offset);
-        }
-    }
-
-    @Override
-    public void fail(Object msgId) {
-        KafkaMessageId id = (KafkaMessageId) msgId;
-        PartitionManager m = _coordinator.getManager(id.partition);
-        if (m != null) {
-            m.fail(id.offset);
-        }
-    }
-
-    @Override
-    public void deactivate() {
-        commit();
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-       if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
-            declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
-        } else {
-            declarer.declare(_spoutConfig.scheme.getOutputFields());
-        }
-    }
-
-    private void commit() {
-        _lastUpdateMs = System.currentTimeMillis();
-        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
-            manager.commit();
-        }
-    }
-
-}