You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by ka...@apache.org on 2018/07/19 03:35:00 UTC

[4/7] storm git commit: STORM-2953: Remove storm-kafka

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
deleted file mode 100644
index 38958b2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
+++ /dev/null
@@ -1,288 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Preconditions;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.net.SocketTimeoutException;
-import java.nio.ByteBuffer;
-import java.nio.channels.UnresolvedAddressException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-import kafka.api.FetchRequest;
-import kafka.api.FetchRequestBuilder;
-import kafka.api.PartitionOffsetRequestInfo;
-import kafka.common.TopicAndPartition;
-import kafka.javaapi.FetchResponse;
-import kafka.javaapi.OffsetRequest;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.Message;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.apache.storm.kafka.trident.IBrokerReader;
-import org.apache.storm.kafka.trident.StaticBrokerReader;
-import org.apache.storm.kafka.trident.ZkBrokerReader;
-import org.apache.storm.metric.api.IMetric;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-public class KafkaUtils {
-
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
-    private static final int NO_OFFSET = -5;
-
-    //suppress default constructor for noninstantiablility
-    private KafkaUtils() {
-        throw new AssertionError();
-    }
-
-    public static IBrokerReader makeBrokerReader(Map<String, Object> topoConf, KafkaConfig conf) {
-        if (conf.hosts instanceof StaticHosts) {
-            return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
-        } else {
-            return new ZkBrokerReader(topoConf, conf.topic, (ZkHosts) conf.hosts);
-        }
-    }
-
-
-    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
-        long startOffsetTime = config.startOffsetTime;
-        return getOffset(consumer, topic, partition, startOffsetTime);
-    }
-
-    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
-        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
-        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
-        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
-        OffsetRequest request = new OffsetRequest(
-            requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
-
-        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
-        if (offsets.length > 0) {
-            return offsets[0];
-        } else {
-            return NO_OFFSET;
-        }
-    }
-
-    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
-        throws TopicOffsetOutOfRangeException, FailedFetchException, RuntimeException {
-        ByteBufferMessageSet msgs = null;
-        String topic = partition.topic;
-        int partitionId = partition.partition;
-        FetchRequestBuilder builder = new FetchRequestBuilder();
-        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
-            clientId(config.clientId).maxWait(config.fetchMaxWait).minBytes(config.minFetchByte).build();
-        FetchResponse fetchResponse;
-        try {
-            fetchResponse = consumer.fetch(fetchRequest);
-        } catch (Exception e) {
-            if (e instanceof ConnectException ||
-                e instanceof SocketTimeoutException ||
-                e instanceof IOException ||
-                e instanceof UnresolvedAddressException
-                ) {
-                LOG.warn("Network error when fetching messages:", e);
-                throw new FailedFetchException(e);
-            } else {
-                throw new RuntimeException(e);
-            }
-        }
-        if (fetchResponse.hasError()) {
-            KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
-            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
-                String msg = partition + " Got fetch request with offset out of range: [" + offset + "]";
-                LOG.warn(msg);
-                throw new TopicOffsetOutOfRangeException(msg);
-            } else {
-                String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
-                LOG.error(message);
-                throw new FailedFetchException(message);
-            }
-        } else {
-            msgs = fetchResponse.messageSet(topic, partitionId);
-        }
-        LOG.debug("Messages fetched. [config = {}], [consumer = {}], [partition = {}], [offset = {}], [msgs = {}]", config, consumer,
-                  partition, offset, msgs);
-        return msgs;
-    }
-
-    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
-        Iterable<List<Object>> tups;
-        ByteBuffer payload = msg.payload();
-        if (payload == null) {
-            return null;
-        }
-        ByteBuffer key = msg.key();
-        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
-            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
-        } else {
-            if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
-                tups = ((StringMultiSchemeWithTopic) kafkaConfig.scheme).deserializeWithTopic(topic, payload);
-            } else {
-                tups = kafkaConfig.scheme.deserialize(payload);
-            }
-        }
-        return tups;
-    }
-
-    public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition,
-                                                        long offset) {
-        ByteBuffer payload = msg.payload();
-        if (payload == null) {
-            return null;
-        }
-        return scheme.deserializeMessageWithMetadata(payload, partition, offset);
-    }
-
-    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons,
-                                                             int totalTasks, int taskIndex, int taskId) {
-        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
-        List<Partition> taskPartitions = new ArrayList<Partition>();
-        List<Partition> partitions = new ArrayList<Partition>();
-        for (GlobalPartitionInformation partitionInformation : partitons) {
-            partitions.addAll(partitionInformation.getOrderedPartitions());
-        }
-        int numPartitions = partitions.size();
-        if (numPartitions < totalTasks) {
-            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions +
-                     "), some tasks will be idle");
-        }
-        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
-            Partition taskPartition = partitions.get(i);
-            taskPartitions.add(taskPartition);
-        }
-        logPartitionMapping(totalTasks, taskIndex, taskPartitions, taskId);
-        return taskPartitions;
-    }
-
-    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions, int taskId) {
-        String taskPrefix = taskPrefix(taskIndex, totalTasks, taskId);
-        if (taskPartitions.isEmpty()) {
-            LOG.warn(taskPrefix + " no partitions assigned");
-        } else {
-            LOG.info(taskPrefix + " assigned " + taskPartitions);
-        }
-    }
-
-    public static String taskPrefix(int taskIndex, int totalTasks, int taskId) {
-        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "], Task-ID: " + taskId;
-    }
-
-    public static class KafkaOffsetMetric implements IMetric {
-        Map<Partition, PartitionManager.OffsetData> _partitionToOffset = new HashMap<Partition, PartitionManager.OffsetData>();
-        Set<Partition> _partitions;
-        DynamicPartitionConnections _connections;
-
-        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
-            _connections = connections;
-        }
-
-        public void setOffsetData(Partition partition, PartitionManager.OffsetData offsetData) {
-            _partitionToOffset.put(partition, offsetData);
-        }
-
-        @Override
-        public Object getValueAndReset() {
-            try {
-                HashMap<String, Long> ret = new HashMap<>();
-                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
-                    Map<String, TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
-                    for (Map.Entry<Partition, PartitionManager.OffsetData> e : _partitionToOffset.entrySet()) {
-                        Partition partition = e.getKey();
-                        SimpleConsumer consumer = _connections.getConnection(partition);
-                        if (consumer == null) {
-                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
-                            return null;
-                        }
-                        long latestTimeOffset =
-                            getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        long earliestTimeOffset =
-                            getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
-                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
-                            LOG.warn("No data found in Kafka Partition " + partition.getId());
-                            return null;
-                        }
-                        long latestEmittedOffset = e.getValue().latestEmittedOffset;
-                        long latestCompletedOffset = e.getValue().latestCompletedOffset;
-                        long spoutLag = latestTimeOffset - latestCompletedOffset;
-                        String topic = partition.topic;
-                        String metricPath = partition.getId();
-                        //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition
-                        if (!metricPath.startsWith(topic + "/")) {
-                            metricPath = topic + "/" + metricPath;
-                        }
-                        ret.put(metricPath + "/" + "spoutLag", spoutLag);
-                        ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
-                        ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
-                        ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
-                        ret.put(metricPath + "/" + "latestCompletedOffset", latestCompletedOffset);
-
-                        if (!topicMetricsMap.containsKey(partition.topic)) {
-                            topicMetricsMap.put(partition.topic, new TopicMetrics());
-                        }
-
-                        TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic);
-                        topicMetrics.totalSpoutLag += spoutLag;
-                        topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
-                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
-                        topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
-                        topicMetrics.totalLatestCompletedOffset += latestCompletedOffset;
-                    }
-
-                    for (Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
-                        String topic = e.getKey();
-                        TopicMetrics topicMetrics = e.getValue();
-                        ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
-                        ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
-                        ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
-                        ret.put(topic + "/" + "totalLatestCompletedOffset", topicMetrics.totalLatestCompletedOffset);
-                    }
-
-                    return ret;
-                } else {
-                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
-                }
-            } catch (Throwable t) {
-                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
-            }
-            return null;
-        }
-
-        public void refreshPartitions(Set<Partition> partitions) {
-            _partitions = partitions;
-            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
-            while (it.hasNext()) {
-                if (!partitions.contains(it.next())) {
-                    it.remove();
-                }
-            }
-        }
-
-        private class TopicMetrics {
-            long totalSpoutLag = 0;
-            long totalEarliestTimeOffset = 0;
-            long totalLatestTimeOffset = 0;
-            long totalLatestEmittedOffset = 0;
-            long totalLatestCompletedOffset = 0;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
deleted file mode 100644
index 6bb1dc5..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-
-public interface KeyValueScheme extends Scheme {
-    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
deleted file mode 100644
index 00983cc..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
-
-    public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
-        super(scheme);
-    }
-
-    public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
-        List<Object> o = ((KeyValueScheme) scheme).deserializeKeyAndValue(key, value);
-        if (o == null) {
-            return null;
-        } else {
-            return Arrays.asList(o);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
deleted file mode 100644
index f77f419..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
+++ /dev/null
@@ -1,21 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-
-public interface MessageMetadataScheme extends Scheme {
-    List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
deleted file mode 100644
index f52a772..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Arrays;
-import java.util.List;
-import org.apache.storm.spout.SchemeAsMultiScheme;
-
-public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
-    private static final long serialVersionUID = -7172403703813625116L;
-
-    public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) {
-        super(scheme);
-    }
-
-    public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
-        List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
-        if (o == null) {
-            return null;
-        } else {
-            return Arrays.asList(o);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
deleted file mode 100644
index 9edf28b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
+++ /dev/null
@@ -1,85 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Objects;
-import java.io.Serializable;
-import org.apache.storm.trident.spout.ISpoutPartition;
-
-
-public class Partition implements ISpoutPartition, Serializable {
-
-    public Broker host;
-    public int partition;
-    public String topic;
-
-    //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
-    private Boolean bUseTopicNameForPartitionPathId;
-
-    // for kryo compatibility
-    private Partition() {
-
-    }
-
-    public Partition(Broker host, String topic, int partition) {
-        this.topic = topic;
-        this.host = host;
-        this.partition = partition;
-        this.bUseTopicNameForPartitionPathId = false;
-    }
-
-    public Partition(Broker host, String topic, int partition, Boolean bUseTopicNameForPartitionPathId) {
-        this.topic = topic;
-        this.host = host;
-        this.partition = partition;
-        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
-    }
-
-    @Override
-    public int hashCode() {
-        return Objects.hashCode(host, topic, partition);
-    }
-
-    @Override
-    public boolean equals(Object obj) {
-        if (this == obj) {
-            return true;
-        }
-        if (obj == null || getClass() != obj.getClass()) {
-            return false;
-        }
-        final Partition other = (Partition) obj;
-        return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) &&
-               Objects.equal(this.partition, other.partition);
-    }
-
-    @Override
-    public String toString() {
-        return "Partition{" +
-               "host=" + host +
-               ", topic=" + topic +
-               ", partition=" + partition +
-               '}';
-    }
-
-    @Override
-    public String getId() {
-        if (bUseTopicNameForPartitionPathId) {
-            return topic + "/partition_" + partition;
-        } else {
-            //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
-            return "partition_" + partition;
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
deleted file mode 100644
index 4dba709..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
+++ /dev/null
@@ -1,23 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.List;
-
-public interface PartitionCoordinator {
-    List<PartitionManager> getMyManagedPartitions();
-
-    PartitionManager getManager(Partition partition);
-
-    void refresh();
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
deleted file mode 100644
index 5805c21..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionManager.java
+++ /dev/null
@@ -1,405 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.base.Strings;
-import com.google.common.collect.ImmutableMap;
-import java.io.Serializable;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import kafka.javaapi.consumer.SimpleConsumer;
-import kafka.javaapi.message.ByteBufferMessageSet;
-import kafka.message.MessageAndOffset;
-import org.apache.storm.Config;
-import org.apache.storm.kafka.KafkaSpout.EmitState;
-import org.apache.storm.kafka.trident.MaxMetric;
-import org.apache.storm.metric.api.CombinedMetric;
-import org.apache.storm.metric.api.CountMetric;
-import org.apache.storm.metric.api.MeanReducer;
-import org.apache.storm.metric.api.ReducedMetric;
-import org.apache.storm.spout.SpoutOutputCollector;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class PartitionManager {
-    private static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
-
-    private final CombinedMetric _fetchAPILatencyMax;
-    private final ReducedMetric _fetchAPILatencyMean;
-    private final CountMetric _fetchAPICallCount;
-    private final CountMetric _fetchAPIMessageCount;
-    // Count of messages which could not be emitted or retried because they were deleted from kafka
-    private final CountMetric _lostMessageCount;
-    // Count of messages which were not retried because failedMsgRetryManager didn't consider offset eligible for
-    // retry
-    private final CountMetric _messageIneligibleForRetryCount;
-    private final FailedMsgRetryManager _failedMsgRetryManager;
-    Long _emittedToOffset;
-    // retryRecords key = Kafka offset, value = retry info for the given message
-    Long _committedTo;
-    LinkedList<MessageAndOffset> _waitingToEmit = new LinkedList<MessageAndOffset>();
-    Partition _partition;
-    SpoutConfig _spoutConfig;
-    String _topologyInstanceId;
-    SimpleConsumer _consumer;
-    DynamicPartitionConnections _connections;
-    ZkState _state;
-    Map _topoConf;
-    long numberFailed, numberAcked;
-    // _pending key = Kafka offset, value = time at which the message was first submitted to the topology
-    private SortedMap<Long, Long> _pending = new TreeMap<Long, Long>();
-
-    public PartitionManager(
-        DynamicPartitionConnections connections,
-        String topologyInstanceId,
-        ZkState state,
-        Map<String, Object> topoConf,
-        SpoutConfig spoutConfig,
-        Partition id) {
-        this(connections, topologyInstanceId, state, topoConf, spoutConfig, id, null);
-    }
-
-    /**
-     * @param previousManager previous partition manager if manager for partition is being recreated
-     */
-    public PartitionManager(
-        DynamicPartitionConnections connections,
-        String topologyInstanceId,
-        ZkState state,
-        Map<String, Object> topoConf,
-        SpoutConfig spoutConfig,
-        Partition id,
-        PartitionManager previousManager) {
-        _partition = id;
-        _connections = connections;
-        _spoutConfig = spoutConfig;
-        _topologyInstanceId = topologyInstanceId;
-        _consumer = connections.register(id.host, id.topic, id.partition);
-        _state = state;
-        _topoConf = topoConf;
-        numberAcked = numberFailed = 0;
-
-        if (previousManager != null) {
-            _failedMsgRetryManager = previousManager._failedMsgRetryManager;
-            _committedTo = previousManager._committedTo;
-            _emittedToOffset = previousManager._emittedToOffset;
-            _waitingToEmit = previousManager._waitingToEmit;
-            _pending = previousManager._pending;
-            LOG.info("Recreating PartitionManager based on previous manager, _waitingToEmit size: {}, _pending size: {}",
-                     _waitingToEmit.size(),
-                     _pending.size());
-        } else {
-            try {
-                _failedMsgRetryManager = (FailedMsgRetryManager) Class.forName(spoutConfig.failedMsgRetryManagerClass).newInstance();
-                _failedMsgRetryManager.prepare(spoutConfig, _topoConf);
-            } catch (ClassNotFoundException | InstantiationException | IllegalAccessException e) {
-                throw new IllegalArgumentException(String.format("Failed to create an instance of <%s> from: <%s>",
-                                                                 FailedMsgRetryManager.class,
-                                                                 spoutConfig.failedMsgRetryManagerClass), e);
-            }
-
-            String jsonTopologyId = null;
-            Long jsonOffset = null;
-            String path = committedPath();
-            try {
-                Map<Object, Object> json = _state.readJSON(path);
-                LOG.info("Read partition information from: " + path + "  --> " + json);
-                if (json != null) {
-                    jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
-                    jsonOffset = (Long) json.get("offset");
-                }
-            } catch (Throwable e) {
-                LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
-            }
-
-            String topic = _partition.topic;
-            Long currentOffset = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig);
-
-            if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
-                _committedTo = currentOffset;
-                LOG.info("No partition information found, using configuration to determine offset");
-            } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.ignoreZkOffsets) {
-                _committedTo = KafkaUtils.getOffset(_consumer, topic, id.partition, spoutConfig.startOffsetTime);
-                LOG.info("Topology change detected and ignore zookeeper offsets set to true, using configuration to determine offset");
-            } else {
-                _committedTo = jsonOffset;
-                LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId +
-                         " - new topology_id: " + topologyInstanceId);
-            }
-
-            if (currentOffset - _committedTo > spoutConfig.maxOffsetBehind || _committedTo <= 0) {
-                LOG.info("Last commit offset from zookeeper: " + _committedTo);
-                Long lastCommittedOffset = _committedTo;
-                _committedTo = currentOffset;
-                LOG.info("Commit offset " + lastCommittedOffset + " is more than " +
-                         spoutConfig.maxOffsetBehind + " behind latest offset " + currentOffset + ", resetting to startOffsetTime=" +
-                         spoutConfig.startOffsetTime);
-            }
-
-            LOG.info("Starting Kafka " + _consumer.host() + " " + id + " from offset " + _committedTo);
-            _emittedToOffset = _committedTo;
-        }
-
-        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
-        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
-        _fetchAPICallCount = new CountMetric();
-        _fetchAPIMessageCount = new CountMetric();
-        _lostMessageCount = new CountMetric();
-        _messageIneligibleForRetryCount = new CountMetric();
-    }
-
-    public Map<String, Object> getMetricsDataMap() {
-        String metricPrefix = _partition.getId();
-
-        Map<String, Object> ret = new HashMap<>();
-        ret.put(metricPrefix + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
-        ret.put(metricPrefix + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
-        ret.put(metricPrefix + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
-        ret.put(metricPrefix + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
-        ret.put(metricPrefix + "/lostMessageCount", _lostMessageCount.getValueAndReset());
-        ret.put(metricPrefix + "/messageIneligibleForRetryCount", _messageIneligibleForRetryCount.getValueAndReset());
-        return ret;
-    }
-
-    //returns false if it's reached the end of current batch
-    public EmitState next(SpoutOutputCollector collector) {
-        if (_waitingToEmit.isEmpty()) {
-            fill();
-        }
-        while (true) {
-            MessageAndOffset toEmit = _waitingToEmit.pollFirst();
-            if (toEmit == null) {
-                return EmitState.NO_EMITTED;
-            }
-
-            Iterable<List<Object>> tups;
-            if (_spoutConfig.scheme instanceof MessageMetadataSchemeAsMultiScheme) {
-                tups = KafkaUtils.generateTuples((MessageMetadataSchemeAsMultiScheme) _spoutConfig.scheme, toEmit.message(), _partition,
-                                                 toEmit.offset());
-            } else {
-                tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.message(), _partition.topic);
-            }
-
-            if ((tups != null) && tups.iterator().hasNext()) {
-                if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
-                    for (List<Object> tup : tups) {
-                        collector.emit(_spoutConfig.outputStreamId, tup, new KafkaMessageId(_partition, toEmit.offset()));
-                    }
-                } else {
-                    for (List<Object> tup : tups) {
-                        collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset()));
-                    }
-                }
-                break;
-            } else {
-                ack(toEmit.offset());
-            }
-        }
-        if (!_waitingToEmit.isEmpty()) {
-            return EmitState.EMITTED_MORE_LEFT;
-        } else {
-            return EmitState.EMITTED_END;
-        }
-    }
-
-
-    private void fill() {
-        long start = System.currentTimeMillis();
-        Long offset;
-
-        // Are there failed tuples? If so, fetch those first.
-        offset = this._failedMsgRetryManager.nextFailedMessageToRetry();
-        final boolean processingNewTuples = (offset == null);
-        if (processingNewTuples) {
-            offset = _emittedToOffset;
-        }
-
-        ByteBufferMessageSet msgs = null;
-        try {
-            msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, offset);
-        } catch (TopicOffsetOutOfRangeException e) {
-            offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
-            // fetch failed, so don't update the fetch metrics
-
-            //fix bug [STORM-643] : remove outdated failed offsets
-            if (!processingNewTuples) {
-                // For the case of EarliestTime it would be better to discard
-                // all the failed offsets, that are earlier than actual EarliestTime
-                // offset, since they are anyway not there.
-                // These calls to broker API will be then saved.
-                Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
-
-                // Omitted messages have not been acked and may be lost
-                if (null != omitted) {
-                    _lostMessageCount.incrBy(omitted.size());
-                }
-
-                _pending.headMap(offset).clear();
-
-                LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
-            }
-
-            if (offset > _emittedToOffset) {
-                _lostMessageCount.incrBy(offset - _emittedToOffset);
-                _emittedToOffset = offset;
-                LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
-            }
-
-            return;
-        }
-        long millis = System.currentTimeMillis() - start;
-        _fetchAPILatencyMax.update(millis);
-        _fetchAPILatencyMean.update(millis);
-        _fetchAPICallCount.incr();
-        if (msgs != null) {
-            int numMessages = 0;
-
-            for (MessageAndOffset msg : msgs) {
-                final Long cur_offset = msg.offset();
-                if (cur_offset < offset) {
-                    // Skip any old offsets.
-                    continue;
-                }
-                if (processingNewTuples || this._failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
-                    numMessages += 1;
-                    if (!_pending.containsKey(cur_offset)) {
-                        _pending.put(cur_offset, System.currentTimeMillis());
-                    }
-                    _waitingToEmit.add(msg);
-                    _emittedToOffset = Math.max(msg.nextOffset(), _emittedToOffset);
-                    if (_failedMsgRetryManager.shouldReEmitMsg(cur_offset)) {
-                        this._failedMsgRetryManager.retryStarted(cur_offset);
-                    }
-                }
-            }
-            _fetchAPIMessageCount.incrBy(numMessages);
-        }
-    }
-
-    public void ack(Long offset) {
-        if (!_pending.isEmpty() && _pending.firstKey() < offset - _spoutConfig.maxOffsetBehind) {
-            // Too many things pending!
-            _pending.headMap(offset - _spoutConfig.maxOffsetBehind).clear();
-        }
-        _pending.remove(offset);
-        this._failedMsgRetryManager.acked(offset);
-        numberAcked++;
-    }
-
-    public void fail(Long offset) {
-        if (offset < _emittedToOffset - _spoutConfig.maxOffsetBehind) {
-            LOG.info(
-                "Skipping failed tuple at offset={}" +
-                " because it's more than maxOffsetBehind={}" +
-                " behind _emittedToOffset={} for {}",
-                offset,
-                _spoutConfig.maxOffsetBehind,
-                _emittedToOffset,
-                _partition
-            );
-        } else {
-            LOG.debug("Failing at offset={} with _pending.size()={} pending and _emittedToOffset={} for {}", offset, _pending.size(),
-                      _emittedToOffset, _partition);
-            numberFailed++;
-            if (numberAcked == 0 && numberFailed > _spoutConfig.maxOffsetBehind) {
-                throw new RuntimeException("Too many tuple failures");
-            }
-
-            // Offset may not be considered for retry by failedMsgRetryManager
-            if (this._failedMsgRetryManager.retryFurther(offset)) {
-                this._failedMsgRetryManager.failed(offset);
-            } else {
-                // state for the offset should be cleaned up
-                LOG.warn("Will not retry failed kafka offset {} further", offset);
-                _messageIneligibleForRetryCount.incr();
-                this._failedMsgRetryManager.cleanOffsetAfterRetries(_partition, offset);
-                _pending.remove(offset);
-                this._failedMsgRetryManager.acked(offset);
-            }
-        }
-    }
-
-    public void commit() {
-        long lastCompletedOffset = lastCompletedOffset();
-        if (_committedTo != lastCompletedOffset) {
-            LOG.debug("Writing last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition,
-                      _topologyInstanceId);
-            Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
-                                                                         .put("topology", ImmutableMap.of("id", _topologyInstanceId,
-                                                                                                          "name", _topoConf
-                                                                                                              .get(Config.TOPOLOGY_NAME)))
-                                                                         .put("offset", lastCompletedOffset)
-                                                                         .put("partition", _partition.partition)
-                                                                         .put("broker", ImmutableMap.of("host", _partition.host.host,
-                                                                                                        "port", _partition.host.port))
-                                                                         .put("topic", _partition.topic).build();
-            _state.writeJSON(committedPath(), data);
-
-            _committedTo = lastCompletedOffset;
-            LOG.debug("Wrote last completed offset ({}) to ZK for {} for topology: {}", lastCompletedOffset, _partition,
-                      _topologyInstanceId);
-        } else {
-            LOG.debug("No new offset for {} for topology: {}", _partition, _topologyInstanceId);
-        }
-    }
-
-    protected String committedPath() {
-        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
-    }
-
-    public long lastCompletedOffset() {
-        if (_pending.isEmpty()) {
-            return _emittedToOffset;
-        } else {
-            return _pending.firstKey();
-        }
-    }
-
-    public OffsetData getOffsetData() {
-        return new OffsetData(_emittedToOffset, lastCompletedOffset());
-    }
-
-    public Partition getPartition() {
-        return _partition;
-    }
-
-    public void close() {
-        commit();
-        _connections.unregister(_partition.host, _partition.topic, _partition.partition);
-    }
-
-    static class KafkaMessageId implements Serializable {
-        public Partition partition;
-        public long offset;
-
-        public KafkaMessageId(Partition partition, long offset) {
-            this.partition = partition;
-            this.offset = offset;
-        }
-    }
-
-    public static class OffsetData {
-        public long latestEmittedOffset;
-        public long latestCompletedOffset;
-
-        public OffsetData(long latestEmittedOffset, long latestCompletedOffset) {
-            this.latestEmittedOffset = latestEmittedOffset;
-            this.latestCompletedOffset = latestCompletedOffset;
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
deleted file mode 100644
index 74a4a3b..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/SpoutConfig.java
+++ /dev/null
@@ -1,58 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.io.Serializable;
-import java.util.List;
-
-
-public class SpoutConfig extends KafkaConfig implements Serializable {
-    private static final long serialVersionUID = -1247769246497567352L;
-    public List<String> zkServers = null;
-    public Integer zkPort = null;
-    public String zkRoot = null;
-    public String id = null;
-
-    public String outputStreamId;
-
-    // setting for how often to save the current kafka offset to ZooKeeper
-    public long stateUpdateIntervalMs = 2000;
-
-    // Retry strategy for failed messages
-    public String failedMsgRetryManagerClass = ExponentialBackoffMsgRetryManager.class.getName();
-
-    // Exponential back-off retry settings.  These are used by ExponentialBackoffMsgRetryManager for retrying messages after a bolt
-    // calls OutputCollector.fail(). These come into effect only if ExponentialBackoffMsgRetryManager is being used.
-    public long retryInitialDelayMs = 0;
-    public double retryDelayMultiplier = 1.0;
-    public long retryDelayMaxMs = 60 * 1000;
-    public int retryLimit = -1;
-
-    /**
-     * Create a SpoutConfig without setting client.id, which can make the source application ambiguous when tracing Kafka calls.
-     */
-    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
-        super(hosts, topic);
-        this.zkRoot = zkRoot;
-        this.id = id;
-    }
-
-    /**
-     * Create a SpoutConfig with a client.id value.
-     */
-    public SpoutConfig(BrokerHosts hosts, String topic, String clientId, String zkRoot, String id) {
-        super(hosts, topic, clientId);
-        this.zkRoot = zkRoot;
-        this.id = id;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
deleted file mode 100644
index 8d12ee1..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticCoordinator.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-
-public class StaticCoordinator implements PartitionCoordinator {
-    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
-    List<PartitionManager> _allManagers = new ArrayList<>();
-
-    public StaticCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig config, ZkState state,
-                             int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
-        StaticHosts hosts = (StaticHosts) config.hosts;
-        List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
-        partitions.add(hosts.getPartitionInformation());
-        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(partitions, totalTasks, taskIndex, taskId);
-        for (Partition myPartition : myPartitions) {
-            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, topoConf, config, myPartition));
-        }
-        _allManagers = new ArrayList<>(_managers.values());
-    }
-
-    @Override
-    public List<PartitionManager> getMyManagedPartitions() {
-        return _allManagers;
-    }
-
-    public PartitionManager getManager(Partition partition) {
-        return _managers.get(partition);
-    }
-
-    @Override
-    public void refresh() { return; }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
deleted file mode 100644
index 1f8f903..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticHosts.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-
-/**
- * Date: 11/05/2013
- * Time: 14:43
- */
-public class StaticHosts implements BrokerHosts {
-
-
-    private GlobalPartitionInformation partitionInformation;
-
-    public StaticHosts(GlobalPartitionInformation partitionInformation) {
-        this.partitionInformation = partitionInformation;
-    }
-
-    public GlobalPartitionInformation getPartitionInformation() {
-        return partitionInformation;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
deleted file mode 100644
index 2c9d4f2..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StaticPartitionConnections.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.HashMap;
-import java.util.Map;
-import kafka.javaapi.consumer.SimpleConsumer;
-
-public class StaticPartitionConnections {
-    Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
-    KafkaConfig _config;
-    StaticHosts hosts;
-
-    public StaticPartitionConnections(KafkaConfig conf) {
-        _config = conf;
-        if (!(conf.hosts instanceof StaticHosts)) {
-            throw new RuntimeException("Must configure with static hosts");
-        }
-        this.hosts = (StaticHosts) conf.hosts;
-    }
-
-    public SimpleConsumer getConsumer(int partition) {
-        if (!_kafka.containsKey(partition)) {
-            Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
-            _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
-
-        }
-        return _kafka.get(partition);
-    }
-
-    public void close() {
-        for (SimpleConsumer consumer : _kafka.values()) {
-            consumer.close();
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
deleted file mode 100644
index 3d62961..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringKeyValueScheme.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import com.google.common.collect.ImmutableMap;
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.tuple.Values;
-
-public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
-
-    @Override
-    public List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value) {
-        if (key == null) {
-            return deserialize(value);
-        }
-        String keyString = StringScheme.deserializeString(key);
-        String valueString = StringScheme.deserializeString(value);
-        return new Values(ImmutableMap.of(keyString, valueString));
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
deleted file mode 100644
index ab6e500..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMessageAndMetadataScheme.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.List;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class StringMessageAndMetadataScheme extends StringScheme implements MessageMetadataScheme {
-    public static final String STRING_SCHEME_PARTITION_KEY = "partition";
-    public static final String STRING_SCHEME_OFFSET = "offset";
-    private static final long serialVersionUID = -5441841920447947374L;
-
-    @Override
-    public List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
-        String stringMessage = StringScheme.deserializeString(message);
-        return new Values(stringMessage, partition.partition, offset);
-    }
-
-    @Override
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY, STRING_SCHEME_PARTITION_KEY, STRING_SCHEME_OFFSET);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
deleted file mode 100644
index 061b30a..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringMultiSchemeWithTopic.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.util.Collections;
-import java.util.List;
-import org.apache.storm.spout.MultiScheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-
-public class StringMultiSchemeWithTopic
-    implements MultiScheme {
-    public static final String STRING_SCHEME_KEY = "str";
-
-    public static final String TOPIC_KEY = "topic";
-
-    @Override
-    public Iterable<List<Object>> deserialize(ByteBuffer bytes) {
-        throw new UnsupportedOperationException();
-    }
-
-    public Iterable<List<Object>> deserializeWithTopic(String topic, ByteBuffer bytes) {
-        List<Object> items = new Values(StringScheme.deserializeString(bytes), topic);
-        return Collections.singletonList(items);
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY, TOPIC_KEY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
deleted file mode 100644
index bcbc058..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/StringScheme.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.ByteBuffer;
-import java.nio.charset.Charset;
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import org.apache.storm.spout.Scheme;
-import org.apache.storm.tuple.Fields;
-import org.apache.storm.tuple.Values;
-import org.apache.storm.utils.Utils;
-
-public class StringScheme implements Scheme {
-    public static final String STRING_SCHEME_KEY = "str";
-    private static final Charset UTF8_CHARSET = StandardCharsets.UTF_8;
-
-    public static String deserializeString(ByteBuffer string) {
-        if (string.hasArray()) {
-            int base = string.arrayOffset();
-            return new String(string.array(), base + string.position(), string.remaining(), UTF8_CHARSET);
-        } else {
-            return new String(Utils.toByteArray(string), UTF8_CHARSET);
-        }
-    }
-
-    public List<Object> deserialize(ByteBuffer bytes) {
-        return new Values(deserializeString(bytes));
-    }
-
-    public Fields getOutputFields() {
-        return new Fields(STRING_SCHEME_KEY);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
deleted file mode 100644
index 613a62e..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/TopicOffsetOutOfRangeException.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-public class TopicOffsetOutOfRangeException extends RuntimeException {
-
-    public TopicOffsetOutOfRangeException(String message) {
-        super(message);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
deleted file mode 100644
index bc9ebd5..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkCoordinator.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import org.apache.storm.kafka.trident.GlobalPartitionInformation;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static org.apache.storm.kafka.KafkaUtils.taskPrefix;
-
-public class ZkCoordinator implements PartitionCoordinator {
-    private static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
-
-    SpoutConfig _spoutConfig;
-    int _taskIndex;
-    int _totalTasks;
-    int _taskId;
-    String _topologyInstanceId;
-    Map<Partition, PartitionManager> _managers = new HashMap();
-    List<PartitionManager> _cachedList = new ArrayList<PartitionManager>();
-    Long _lastRefreshTime = null;
-    int _refreshFreqMs;
-    DynamicPartitionConnections _connections;
-    DynamicBrokersReader _reader;
-    ZkState _state;
-    Map _topoConf;
-
-    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
-                         int taskIndex, int totalTasks, int taskId, String topologyInstanceId) {
-        this(connections, topoConf, spoutConfig, state, taskIndex, totalTasks, taskId, topologyInstanceId,
-             buildReader(topoConf, spoutConfig));
-    }
-
-    public ZkCoordinator(DynamicPartitionConnections connections, Map<String, Object> topoConf, SpoutConfig spoutConfig, ZkState state,
-                         int taskIndex, int totalTasks, int taskId, String topologyInstanceId, DynamicBrokersReader reader) {
-        _spoutConfig = spoutConfig;
-        _connections = connections;
-        _taskIndex = taskIndex;
-        _totalTasks = totalTasks;
-        _taskId = taskId;
-        _topologyInstanceId = topologyInstanceId;
-        _topoConf = topoConf;
-        _state = state;
-        ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
-        _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
-        _reader = reader;
-    }
-
-    private static DynamicBrokersReader buildReader(Map<String, Object> topoConf, SpoutConfig spoutConfig) {
-        ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
-        return new DynamicBrokersReader(topoConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
-    }
-
-    @Override
-    public List<PartitionManager> getMyManagedPartitions() {
-        if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
-            refresh();
-            _lastRefreshTime = System.currentTimeMillis();
-        }
-        return _cachedList;
-    }
-
-    @Override
-    public void refresh() {
-        try {
-            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Refreshing partition manager connections");
-            List<GlobalPartitionInformation> brokerInfo = _reader.getBrokerInfo();
-            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex, _taskId);
-
-            Set<Partition> curr = _managers.keySet();
-            Set<Partition> newPartitions = new HashSet<Partition>(mine);
-            newPartitions.removeAll(curr);
-
-            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
-            deletedPartitions.removeAll(mine);
-
-            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Deleted partition managers: " + deletedPartitions.toString());
-
-            Map<Integer, PartitionManager> deletedManagers = new HashMap<>();
-            for (Partition id : deletedPartitions) {
-                deletedManagers.put(id.partition, _managers.remove(id));
-            }
-            for (PartitionManager manager : deletedManagers.values()) {
-                if (manager != null) manager.close();
-            }
-            LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " New partition managers: " + newPartitions.toString());
-
-            for (Partition id : newPartitions) {
-                PartitionManager man = new PartitionManager(
-                    _connections,
-                    _topologyInstanceId,
-                    _state,
-                    _topoConf,
-                    _spoutConfig,
-                    id,
-                    deletedManagers.get(id.partition));
-                _managers.put(id, man);
-            }
-
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-        _cachedList = new ArrayList<PartitionManager>(_managers.values());
-        LOG.info(taskPrefix(_taskIndex, _totalTasks, _taskId) + " Finished refreshing");
-    }
-
-    @Override
-    public PartitionManager getManager(Partition partition) {
-        return _managers.get(partition);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
deleted file mode 100644
index 9c6b29d..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkHosts.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-
-public class ZkHosts implements BrokerHosts {
-    private static final String DEFAULT_ZK_PATH = "/brokers";
-
-    public String brokerZkStr = null;
-    public String brokerZkPath = null; // e.g., /kafka/brokers
-    public int refreshFreqSecs = 60;
-
-    public ZkHosts(String brokerZkStr, String brokerZkPath) {
-        this.brokerZkStr = brokerZkStr;
-        this.brokerZkPath = brokerZkPath;
-    }
-
-    public ZkHosts(String brokerZkStr) {
-        this(brokerZkStr, DEFAULT_ZK_PATH);
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
deleted file mode 100644
index 3d27173..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ZkState.java
+++ /dev/null
@@ -1,112 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka;
-
-import java.nio.charset.Charset;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.retry.RetryNTimes;
-import org.apache.storm.Config;
-import org.apache.storm.utils.ObjectReader;
-import org.apache.zookeeper.CreateMode;
-import org.json.simple.JSONValue;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class ZkState {
-    private static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
-    CuratorFramework _curator;
-
-    public ZkState(Map<String, Object> stateConf) {
-        stateConf = new HashMap<>(stateConf);
-
-        try {
-            _curator = newCurator(stateConf);
-            _curator.start();
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    private CuratorFramework newCurator(final Map<String, Object> stateConf)
-        throws Exception {
-        Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
-        String serverPorts = "";
-        for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
-            serverPorts = serverPorts + server + ":" + port + ",";
-        }
-        return CuratorFrameworkFactory.newClient(serverPorts,
-                                                 ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                                                 ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
-                                                 new RetryNTimes(ObjectReader.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                                                                 ObjectReader
-                                                                     .getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-    }
-
-    public CuratorFramework getCurator() {
-        assert _curator != null;
-        return _curator;
-    }
-
-    public void writeJSON(String path, Map<Object, Object> data) {
-        LOG.debug("Writing {} the data {}", path, data.toString());
-        writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
-    }
-
-    public void writeBytes(String path, byte[] bytes) {
-        try {
-            if (_curator.checkExists().forPath(path) == null) {
-                _curator.create()
-                        .creatingParentsIfNeeded()
-                        .withMode(CreateMode.PERSISTENT)
-                        .forPath(path, bytes);
-            } else {
-                _curator.setData().forPath(path, bytes);
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public Map<Object, Object> readJSON(String path) {
-        try {
-            byte[] b = readBytes(path);
-            if (b == null) {
-                return null;
-            }
-            return (Map<Object, Object>) JSONValue.parseWithException(new String(b, "UTF-8"));
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public byte[] readBytes(String path) {
-        try {
-            if (_curator.checkExists().forPath(path) != null) {
-                return _curator.getData().forPath(path);
-            } else {
-                return null;
-            }
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-
-    public void close() {
-        _curator.close();
-        _curator = null;
-    }
-}

http://git-wip-us.apache.org/repos/asf/storm/blob/e58ac3e0/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
deleted file mode 100644
index e0b94f3..0000000
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/bolt/KafkaBolt.java
+++ /dev/null
@@ -1,172 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.  The ASF licenses this file to you under the Apache License, Version
- * 2.0 (the "License"); you may not use this file except in compliance with the License.  You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the specific language governing permissions
- * and limitations under the License.
- */
-
-package org.apache.storm.kafka.bolt;
-
-import java.util.Map;
-import java.util.Properties;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.Future;
-import org.apache.kafka.clients.producer.Callback;
-import org.apache.kafka.clients.producer.KafkaProducer;
-import org.apache.kafka.clients.producer.ProducerRecord;
-import org.apache.kafka.clients.producer.RecordMetadata;
-import org.apache.storm.kafka.bolt.mapper.FieldNameBasedTupleToKafkaMapper;
-import org.apache.storm.kafka.bolt.mapper.TupleToKafkaMapper;
-import org.apache.storm.kafka.bolt.selector.DefaultTopicSelector;
-import org.apache.storm.kafka.bolt.selector.KafkaTopicSelector;
-import org.apache.storm.task.OutputCollector;
-import org.apache.storm.task.TopologyContext;
-import org.apache.storm.topology.OutputFieldsDeclarer;
-import org.apache.storm.topology.base.BaseTickTupleAwareRichBolt;
-import org.apache.storm.tuple.Tuple;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-
-/**
- * Bolt implementation that can send Tuple data to Kafka
- * <p/>
- * It expects the producer configuration and topic in storm config under
- * <p/>
- * 'kafka.broker.properties' and 'topic'
- * <p/>
- * respectively.
- * <p/>
- * This bolt uses 0.8.2 Kafka Producer API.
- * <p/>
- * It works for sending tuples to older Kafka version (0.8.1).
- * @deprecated Please use the KafkaBolt in storm-kafka-client
- */
-@Deprecated
-public class KafkaBolt<K, V> extends BaseTickTupleAwareRichBolt {
-
-    public static final String TOPIC = "topic";
-    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
-    private KafkaProducer<K, V> producer;
-    private OutputCollector collector;
-    private TupleToKafkaMapper<K, V> mapper;
-    private KafkaTopicSelector topicSelector;
-    private Properties boltSpecfiedProperties = new Properties();
-    /**
-     * With default setting for fireAndForget and async, the callback is called when the sending succeeds.
-     * By setting fireAndForget true, the send will not wait at all for kafka to ack.
-     * "acks" setting in 0.8.2 Producer API config doesn't matter if fireAndForget is set.
-     * By setting async false, synchronous sending is used. 
-     */
-    private boolean fireAndForget = false;
-    private boolean async = true;
-
-    public KafkaBolt() {}
-
-    public KafkaBolt<K, V> withTupleToKafkaMapper(TupleToKafkaMapper<K, V> mapper) {
-        this.mapper = mapper;
-        return this;
-    }
-
-    public KafkaBolt<K, V> withTopicSelector(KafkaTopicSelector selector) {
-        this.topicSelector = selector;
-        return this;
-    }
-
-    public KafkaBolt<K, V> withProducerProperties(Properties producerProperties) {
-        this.boltSpecfiedProperties = producerProperties;
-        return this;
-    }
-
-    @Override
-    public void prepare(Map<String, Object> topoConf, TopologyContext context, OutputCollector collector) {
-        //for backward compatibility.
-        if (mapper == null) {
-            this.mapper = new FieldNameBasedTupleToKafkaMapper<K, V>();
-        }
-
-        //for backward compatibility.
-        if (topicSelector == null) {
-            if (topoConf.containsKey(TOPIC)) {
-                this.topicSelector = new DefaultTopicSelector((String) topoConf.get(TOPIC));
-            } else {
-                throw new IllegalArgumentException("topic should be specified in bolt's configuration");
-            }
-        }
-
-        producer = new KafkaProducer<>(boltSpecfiedProperties);
-        this.collector = collector;
-    }
-
-    @Override
-    protected void process(final Tuple input) {
-        K key = null;
-        V message = null;
-        String topic = null;
-        try {
-            key = mapper.getKeyFromTuple(input);
-            message = mapper.getMessageFromTuple(input);
-            topic = topicSelector.getTopic(input);
-            if (topic != null) {
-                Callback callback = null;
-
-                if (!fireAndForget && async) {
-                    callback = new Callback() {
-                        @Override
-                        public void onCompletion(RecordMetadata ignored, Exception e) {
-                            synchronized (collector) {
-                                if (e != null) {
-                                    collector.reportError(e);
-                                    collector.fail(input);
-                                } else {
-                                    collector.ack(input);
-                                }
-                            }
-                        }
-                    };
-                }
-                Future<RecordMetadata> result = producer.send(new ProducerRecord<K, V>(topic, key, message), callback);
-                if (!async) {
-                    try {
-                        result.get();
-                        collector.ack(input);
-                    } catch (ExecutionException err) {
-                        collector.reportError(err);
-                        collector.fail(input);
-                    }
-                } else if (fireAndForget) {
-                    collector.ack(input);
-                }
-            } else {
-                LOG.warn("skipping key = " + key + ", topic selector returned null.");
-                collector.ack(input);
-            }
-        } catch (Exception ex) {
-            collector.reportError(ex);
-            collector.fail(input);
-        }
-    }
-
-    @Override
-    public void declareOutputFields(OutputFieldsDeclarer declarer) {
-
-    }
-
-    @Override
-    public void cleanup() {
-        producer.close();
-    }
-
-    public void setFireAndForget(boolean fireAndForget) {
-        this.fireAndForget = fireAndForget;
-    }
-
-    public void setAsync(boolean async) {
-        this.async = async;
-    }
-}