You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:36 UTC

[40/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs to org.apache.storm, but try to provide some form of backwards compatability

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
index b76e230..d2ca5b8 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
@@ -17,9 +17,9 @@
  */
 package org.apache.storm.jdbc.trident.state;
 
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
index 1fda3b1..9a5ec09 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
@@ -17,7 +17,7 @@
  */
 package org.apache.storm.jdbc.bolt;
 
-import backtype.storm.tuple.Fields;
+import org.apache.storm.tuple.Fields;
 import com.google.common.collect.Lists;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.common.ConnectionProvider;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
index 718917a..fdcd053 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -17,12 +17,12 @@
  */
 package org.apache.storm.jdbc.spout;
 
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
 import com.google.common.collect.Lists;
 
 import java.util.*;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 9df5a86..ec7ca36 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -17,10 +17,10 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
 import org.apache.storm.jdbc.common.Column;
@@ -32,7 +32,7 @@ import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
 import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
 import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
 import org.apache.storm.jdbc.spout.UserSpout;
-import backtype.storm.LocalCluster;
+import org.apache.storm.LocalCluster;
 
 import java.sql.Types;
 import java.util.List;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 585994e..1915219 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
 import com.google.common.collect.Lists;
 import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
 import org.apache.storm.jdbc.bolt.JdbcLookupBolt;

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 522d41a..11269c3 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -17,8 +17,8 @@
  */
 package org.apache.storm.jdbc.topology;
 
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
 import com.google.common.collect.Lists;
 import org.apache.storm.jdbc.common.Column;
 import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
@@ -27,9 +27,9 @@ import org.apache.storm.jdbc.trident.state.JdbcQuery;
 import org.apache.storm.jdbc.trident.state.JdbcState;
 import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
 import org.apache.storm.jdbc.trident.state.JdbcUpdater;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
 
 import java.sql.Types;
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 3dfa0b7..3a86cf0 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -65,7 +65,7 @@ In addition to these parameters, SpoutConfig contains the following fields that
 
     // Exponential back-off retry settings.  These are used when retrying messages after a bolt
     // calls OutputCollector.fail().
-    // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+    // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
     // resubmitting the message while still retrying.
     public long retryInitialDelayMs = 0;
     public double retryDelayMultiplier = 1.0;
@@ -190,9 +190,9 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependen
 Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
 
 ##Writing to Kafka as part of your topology
-You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
-are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
-storm.kafka.trident.TridentKafkaUpdater.
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
 
 You need to provide implementation of following 2 interfaces
 

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
new file mode 100644
index 0000000..0d95e8d
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+
+public class Broker implements Serializable, Comparable<Broker> {
+    public String host;
+    public int port;
+
+    // for kryo compatibility
+    private Broker() {
+	
+    }
+    
+    public Broker(String host, int port) {
+        this.host = host;
+        this.port = port;
+    }
+
+    public Broker(String host) {
+        this(host, 9092);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(host, port);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final Broker other = (Broker) obj;
+        return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
+    }
+
+    @Override
+    public String toString() {
+        return host + ":" + port;
+    }
+
+    public static Broker fromString(String host) {
+        Broker hp;
+        String[] spec = host.split(":");
+        if (spec.length == 1) {
+            hp = new Broker(spec[0]);
+        } else if (spec.length == 2) {
+            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
+        } else {
+            throw new IllegalArgumentException("Invalid host specification: " + host);
+        }
+        return hp;
+    }
+
+
+    @Override
+    public int compareTo(Broker o) {
+        if (this.host.equals(o.host)) {
+            return this.port - o.port;
+        } else {
+            return this.host.compareTo(o.host);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
new file mode 100644
index 0000000..13ba0a1
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+
+
+public interface BrokerHosts extends Serializable {
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
new file mode 100644
index 0000000..2a18a7f
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.utils.Utils;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {
+
+  }
+
+  @Override
+  public void close() {
+
+  }
+
+  @Override
+  public byte[] serialize(String s, ByteBuffer b) {
+    return Utils.toByteArray(b);
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
new file mode 100644
index 0000000..0fc85b3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.io.UnsupportedEncodingException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DynamicBrokersReader {
+
+    public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
+
+    private CuratorFramework _curator;
+    private String _zkPath;
+    private String _topic;
+    private Boolean _isWildcardTopic;
+
+    public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
+        // Check required parameters
+        Preconditions.checkNotNull(conf, "conf cannot be null");
+
+        validateConfig(conf);
+
+        Preconditions.checkNotNull(zkStr,"zkString cannot be null");
+        Preconditions.checkNotNull(zkPath, "zkPath cannot be null");
+        Preconditions.checkNotNull(topic, "topic cannot be null");
+
+        _zkPath = zkPath;
+        _topic = topic;
+        _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false);
+        try {
+            _curator = CuratorFrameworkFactory.newClient(
+                    zkStr,
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
+                    new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                            Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+            _curator.start();
+        } catch (Exception ex) {
+            LOG.error("Couldn't connect to zookeeper", ex);
+            throw new RuntimeException(ex);
+        }
+    }
+
+    /**
+     * Get all partitions with their current leaders
+     */
+    public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
+      List<String> topics =  getTopics();
+      List<GlobalPartitionInformation> partitions =  new ArrayList<GlobalPartitionInformation>();
+
+      for (String topic : topics) {
+          GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
+          try {
+              int numPartitionsForTopic = getNumPartitions(topic);
+              String brokerInfoPath = brokerPath();
+              for (int partition = 0; partition < numPartitionsForTopic; partition++) {
+                  int leader = getLeaderFor(topic,partition);
+                  String path = brokerInfoPath + "/" + leader;
+                  try {
+                      byte[] brokerData = _curator.getData().forPath(path);
+                      Broker hp = getBrokerHost(brokerData);
+                      globalPartitionInformation.addPartition(partition, hp);
+                  } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+                      LOG.error("Node {} does not exist ", path);
+                  }
+              }
+          } catch (SocketTimeoutException e) {
+              throw e;
+          } catch (Exception e) {
+              throw new RuntimeException(e);
+          }
+          LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
+          partitions.add(globalPartitionInformation);
+      }
+        return partitions;
+    }
+
+    private int getNumPartitions(String topic) {
+        try {
+            String topicBrokersPath = partitionPath(topic);
+            List<String> children = _curator.getChildren().forPath(topicBrokersPath);
+            return children.size();
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    private List<String> getTopics() {
+        List<String> topics = new ArrayList<String>();
+        if (!_isWildcardTopic) {
+            topics.add(_topic);
+            return topics;
+        } else {
+            try {
+                List<String> children = _curator.getChildren().forPath(topicsPath());
+                for (String t : children) {
+                    if (t.matches(_topic)) {
+                        LOG.info(String.format("Found matching topic %s", t));
+                        topics.add(t);
+                    }
+                }
+                return topics;
+            } catch (Exception e) {
+                throw new RuntimeException(e);
+            }
+        }
+    }
+
+    public String topicsPath () {
+        return _zkPath + "/topics";
+    }
+    public String partitionPath(String topic) {
+        return topicsPath() + "/" + topic + "/partitions";
+    }
+
+    public String brokerPath() {
+        return _zkPath + "/ids";
+    }
+
+
+
+    /**
+     * get /brokers/topics/distributedTopic/partitions/1/state
+     * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
+     * @param topic
+     * @param partition
+     * @return
+     */
+    private int getLeaderFor(String topic, long partition) {
+        try {
+            String topicBrokersPath = partitionPath(topic);
+            byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
+            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
+            Integer leader = ((Number) value.get("leader")).intValue();
+            if (leader == -1) {
+                throw new RuntimeException("No leader found for partition " + partition);
+            }
+            return leader;
+        } catch (RuntimeException e) {
+            throw e;
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public void close() {
+        _curator.close();
+    }
+
+    /**
+     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
+     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
+     *
+     * @param contents
+     * @return
+     */
+    private Broker getBrokerHost(byte[] contents) {
+        try {
+            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
+            String host = (String) value.get("host");
+            Integer port = ((Long) value.get("port")).intValue();
+            return new Broker(host, port);
+        } catch (UnsupportedEncodingException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    /**
+     * Validate required parameters in the input configuration Map
+     * @param conf
+     */
+    private void validateConfig(final Map conf) {
+        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
+                "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
+        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),
+                "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
+        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
+                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES);
+        Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL),
+                "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
new file mode 100644
index 0000000..6d30139
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.IBrokerReader;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class DynamicPartitionConnections {
+
+    public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
+
+    static class ConnectionInfo {
+        SimpleConsumer consumer;
+        Set<String> partitions = new HashSet<String>();
+
+        public ConnectionInfo(SimpleConsumer consumer) {
+            this.consumer = consumer;
+        }
+    }
+
+    Map<Broker, ConnectionInfo> _connections = new HashMap();
+    KafkaConfig _config;
+    IBrokerReader _reader;
+
+    public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
+        _config = config;
+        _reader = brokerReader;
+    }
+
+    public SimpleConsumer register(Partition partition) {
+        Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition);
+        return register(broker, partition.topic, partition.partition);
+    }
+
+    public SimpleConsumer register(Broker host, String topic, int partition) {
+        if (!_connections.containsKey(host)) {
+            _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
+        }
+        ConnectionInfo info = _connections.get(host);
+        info.partitions.add(getHashKey(topic,partition));
+        return info.consumer;
+    }
+
+    public SimpleConsumer getConnection(Partition partition) {
+        ConnectionInfo info = _connections.get(partition.host);
+        if (info != null) {
+            return info.consumer;
+        }
+        return null;
+    }
+
+    public void unregister(Broker port, String topic, int partition) {
+        ConnectionInfo info = _connections.get(port);
+        info.partitions.remove(getHashKey(topic,partition));
+        if (info.partitions.isEmpty()) {
+            info.consumer.close();
+            _connections.remove(port);
+        }
+    }
+
+    public void unregister(Partition partition) {
+        unregister(partition.host, partition.topic, partition.partition);
+    }
+
+    public void clear() {
+        for (ConnectionInfo info : _connections.values()) {
+            info.consumer.close();
+        }
+        _connections.clear();
+    }
+
+    private String getHashKey(String topic, int partition) {
+        return topic + "_" + partition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
new file mode 100644
index 0000000..f86d624
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
+
+    private final long retryInitialDelayMs;
+    private final double retryDelayMultiplier;
+    private final long retryDelayMaxMs;
+
+    private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+    private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+
+    public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
+        this.retryInitialDelayMs = retryInitialDelayMs;
+        this.retryDelayMultiplier = retryDelayMultiplier;
+        this.retryDelayMaxMs = retryDelayMaxMs;
+    }
+
+    @Override
+    public void failed(Long offset) {
+        MessageRetryRecord oldRecord = this.records.get(offset);
+        MessageRetryRecord newRecord = oldRecord == null ?
+                                       new MessageRetryRecord(offset) :
+                                       oldRecord.createNextRetryRecord();
+        this.records.put(offset, newRecord);
+        this.waiting.add(newRecord);
+    }
+
+    @Override
+    public void acked(Long offset) {
+        MessageRetryRecord record = this.records.remove(offset);
+        if (record != null) {
+            this.waiting.remove(record);
+        }
+    }
+
+    @Override
+    public void retryStarted(Long offset) {
+        MessageRetryRecord record = this.records.get(offset);
+        if (record == null || !this.waiting.contains(record)) {
+            throw new IllegalStateException("cannot retry a message that has not failed");
+        } else {
+            this.waiting.remove(record);
+        }
+    }
+
+    @Override
+    public Long nextFailedMessageToRetry() {
+        if (this.waiting.size() > 0) {
+            MessageRetryRecord first = this.waiting.peek();
+            if (System.currentTimeMillis() >= first.retryTimeUTC) {
+                if (this.records.containsKey(first.offset)) {
+                    return first.offset;
+                } else {
+                    // defensive programming - should be impossible
+                    this.waiting.remove(first);
+                    return nextFailedMessageToRetry();
+                }
+            }
+        }
+        return null;
+    }
+
+    @Override
+    public boolean shouldRetryMsg(Long offset) {
+        MessageRetryRecord record = this.records.get(offset);
+        return record != null &&
+                this.waiting.contains(record) &&
+                System.currentTimeMillis() >= record.retryTimeUTC;
+    }
+
+    @Override
+    public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+        Set<Long> invalidOffsets = new HashSet<Long>(); 
+        for(Long offset : records.keySet()){
+            if(offset < kafkaOffset){
+                MessageRetryRecord record = this.records.remove(offset);
+                if (record != null) {
+                    this.waiting.remove(record);
+                    invalidOffsets.add(offset);
+                }
+            }
+        }
+        return invalidOffsets;
+    }
+
+    /**
+     * A MessageRetryRecord holds the data of how many times a message has
+     * failed and been retried, and when the last failure occurred.  It can
+     * determine whether it is ready to be retried by employing an exponential
+     * back-off calculation using config values stored in SpoutConfig:
+     * <ul>
+     *  <li>retryInitialDelayMs - time to delay before the first retry</li>
+     *  <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
+     *  <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
+     *                        delay for this amount of time every time)
+     *  </li>
+     * </ul>
+     */
+    private class MessageRetryRecord {
+        private final long offset;
+        private final int retryNum;
+        private final long retryTimeUTC;
+
+        public MessageRetryRecord(long offset) {
+            this(offset, 1);
+        }
+
+        private MessageRetryRecord(long offset, int retryNum) {
+            this.offset = offset;
+            this.retryNum = retryNum;
+            this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
+        }
+
+        /**
+         * Create a MessageRetryRecord for the next retry that should occur after this one.
+         * @return MessageRetryRecord with the next retry time, or null to indicate that another
+         *         retry should not be performed.  The latter case can happen if we are about to
+         *         run into the org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+         *         configuration.
+         */
+        public MessageRetryRecord createNextRetryRecord() {
+            return new MessageRetryRecord(this.offset, this.retryNum + 1);
+        }
+
+        private long calculateRetryDelay() {
+            double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
+            double delay = retryInitialDelayMs * delayMultiplier;
+            Long maxLong = Long.MAX_VALUE;
+            long delayThisRetryMs = delay >= maxLong.doubleValue()
+                                    ?  maxLong
+                                    : (long) delay;
+            return Math.min(delayThisRetryMs, retryDelayMaxMs);
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            return (other instanceof MessageRetryRecord
+                    && this.offset == ((MessageRetryRecord) other).offset);
+        }
+
+        @Override
+        public int hashCode() {
+            return Long.valueOf(this.offset).hashCode();
+        }
+    }
+
+    private static class RetryTimeComparator implements Comparator<MessageRetryRecord> {
+
+        @Override
+        public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
+            return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            return false;
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
new file mode 100644
index 0000000..448d0c3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+public class FailedFetchException extends RuntimeException {
+
+    public FailedFetchException(String message) {
+        super(message);
+    }
+
+    public FailedFetchException(Exception e) {
+        super(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
new file mode 100644
index 0000000..e9a7092
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.Set;
+
+public interface FailedMsgRetryManager {
+    public void failed(Long offset);
+    public void acked(Long offset);
+    public void retryStarted(Long offset);
+    public Long nextFailedMessageToRetry();
+    public boolean shouldRetryMsg(Long offset);
+    public Set<Long> clearInvalidMessages(Long kafkaOffset);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
new file mode 100644
index 0000000..75f5563
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.Map;
+
+public class IntSerializer implements Serializer<Integer> {
+  @Override
+  public void configure(Map<String, ?> map, boolean b) {
+  }
+
+  @Override
+  public byte[] serialize(String topic, Integer val) {
+    byte[] r = new byte[4];
+    IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
+    b.put(val);
+    return r;
+  }
+
+  @Override
+  public void close() {
+  }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
new file mode 100644
index 0000000..e1e1d24
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.spout.RawMultiScheme;
+
+import java.io.Serializable;
+
+public class KafkaConfig implements Serializable {
+    private static final long serialVersionUID = 5276718734571623855L;
+    
+    public final BrokerHosts hosts;
+    public final String topic;
+    public final String clientId;
+
+    public int fetchSizeBytes = 1024 * 1024;
+    public int socketTimeoutMs = 10000;
+    public int fetchMaxWait = 10000;
+    public int bufferSizeBytes = 1024 * 1024;
+    public MultiScheme scheme = new RawMultiScheme();
+    public boolean ignoreZkOffsets = false;
+    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+    public long maxOffsetBehind = Long.MAX_VALUE;
+    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+    public int metricsTimeBucketSizeInSecs = 60;
+
+    public KafkaConfig(BrokerHosts hosts, String topic) {
+        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
+    }
+
+    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+        this.hosts = hosts;
+        this.topic = topic;
+        this.clientId = clientId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
new file mode 100644
index 0000000..1d866e7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+public enum KafkaError {
+    NO_ERROR,
+    OFFSET_OUT_OF_RANGE,
+    INVALID_MESSAGE,
+    UNKNOWN_TOPIC_OR_PARTITION,
+    INVALID_FETCH_SIZE,
+    LEADER_NOT_AVAILABLE,
+    NOT_LEADER_FOR_PARTITION,
+    REQUEST_TIMED_OUT,
+    BROKER_NOT_AVAILABLE,
+    REPLICA_NOT_AVAILABLE,
+    MESSAGE_SIZE_TOO_LARGE,
+    STALE_CONTROLLER_EPOCH,
+    OFFSET_METADATA_TOO_LARGE,
+    UNKNOWN;
+
+    public static KafkaError getError(int errorCode) {
+        if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
+            return UNKNOWN;
+        } else {
+            return values()[errorCode];
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
new file mode 100644
index 0000000..7a83ae0
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import com.google.common.base.Strings;
+import kafka.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
+
+import java.util.*;
+
+// TODO: need to add blacklisting
+// TODO: need to make a best effort to not re-emit messages if don't have to
+public class KafkaSpout extends BaseRichSpout {
+    static enum EmitState {
+        EMITTED_MORE_LEFT,
+        EMITTED_END,
+        NO_EMITTED
+    }
+
+    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+
+    SpoutConfig _spoutConfig;
+    SpoutOutputCollector _collector;
+    PartitionCoordinator _coordinator;
+    DynamicPartitionConnections _connections;
+    ZkState _state;
+
+    long _lastUpdateMs = 0;
+
+    int _currPartitionIndex = 0;
+
+    public KafkaSpout(SpoutConfig spoutConf) {
+        _spoutConfig = spoutConf;
+    }
+
+    @Override
+    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+        _collector = collector;
+        String topologyInstanceId = context.getStormId();
+        Map stateConf = new HashMap(conf);
+        List<String> zkServers = _spoutConfig.zkServers;
+        if (zkServers == null) {
+            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+        }
+        Integer zkPort = _spoutConfig.zkPort;
+        if (zkPort == null) {
+            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+        }
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
+        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
+        _state = new ZkState(stateConf);
+
+        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
+
+        // using TransactionalState like this is a hack
+        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+        if (_spoutConfig.hosts instanceof StaticHosts) {
+            _coordinator = new StaticCoordinator(_connections, conf,
+                    _spoutConfig, _state, context.getThisTaskIndex(),
+                    totalTasks, topologyInstanceId);
+        } else {
+            _coordinator = new ZkCoordinator(_connections, conf,
+                    _spoutConfig, _state, context.getThisTaskIndex(),
+                    totalTasks, topologyInstanceId);
+        }
+
+        context.registerMetric("kafkaOffset", new IMetric() {
+            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
+
+            @Override
+            public Object getValueAndReset() {
+                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
+                Set<Partition> latestPartitions = new HashSet();
+                for (PartitionManager pm : pms) {
+                    latestPartitions.add(pm.getPartition());
+                }
+                _kafkaOffsetMetric.refreshPartitions(latestPartitions);
+                for (PartitionManager pm : pms) {
+                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
+                }
+                return _kafkaOffsetMetric.getValueAndReset();
+            }
+        }, _spoutConfig.metricsTimeBucketSizeInSecs);
+
+        context.registerMetric("kafkaPartition", new IMetric() {
+            @Override
+            public Object getValueAndReset() {
+                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
+                Map concatMetricsDataMaps = new HashMap();
+                for (PartitionManager pm : pms) {
+                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
+                }
+                return concatMetricsDataMaps;
+            }
+        }, _spoutConfig.metricsTimeBucketSizeInSecs);
+    }
+
+    @Override
+    public void close() {
+        _state.close();
+    }
+
+    @Override
+    public void nextTuple() {
+        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
+        for (int i = 0; i < managers.size(); i++) {
+
+            try {
+                // in case the number of managers decreased
+                _currPartitionIndex = _currPartitionIndex % managers.size();
+                EmitState state = managers.get(_currPartitionIndex).next(_collector);
+                if (state != EmitState.EMITTED_MORE_LEFT) {
+                    _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
+                }
+                if (state != EmitState.NO_EMITTED) {
+                    break;
+                }
+            } catch (FailedFetchException e) {
+                LOG.warn("Fetch failed", e);
+                _coordinator.refresh();
+            }
+        }
+
+        long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
+
+        /*
+             As far as the System.currentTimeMillis() is dependent on System clock,
+             additional check on negative value of diffWithNow in case of external changes.
+         */
+        if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
+            commit();
+        }
+    }
+
+    @Override
+    public void ack(Object msgId) {
+        KafkaMessageId id = (KafkaMessageId) msgId;
+        PartitionManager m = _coordinator.getManager(id.partition);
+        if (m != null) {
+            m.ack(id.offset);
+        }
+    }
+
+    @Override
+    public void fail(Object msgId) {
+        KafkaMessageId id = (KafkaMessageId) msgId;
+        PartitionManager m = _coordinator.getManager(id.partition);
+        if (m != null) {
+            m.fail(id.offset);
+        }
+    }
+
+    @Override
+    public void deactivate() {
+        commit();
+    }
+
+    @Override
+    public void declareOutputFields(OutputFieldsDeclarer declarer) {
+       if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+            declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
+        } else {
+            declarer.declare(_spoutConfig.scheme.getOutputFields());
+        }
+    }
+
+    private void commit() {
+        _lastUpdateMs = System.currentTimeMillis();
+        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
+            manager.commit();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
new file mode 100644
index 0000000..8cd0fd0
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.utils.Utils;
+import com.google.common.base.Preconditions;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.kafka.trident.IBrokerReader;
+import org.apache.storm.kafka.trident.StaticBrokerReader;
+import org.apache.storm.kafka.trident.ZkBrokerReader;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.*;
+
+
+public class KafkaUtils {
+
+    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+    private static final int NO_OFFSET = -5;
+
+
+    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+        if (conf.hosts instanceof StaticHosts) {
+            return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
+        } else {
+            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+        }
+    }
+
+
+    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
+        long startOffsetTime = config.startOffsetTime;
+        return getOffset(consumer, topic, partition, startOffsetTime);
+    }
+
+    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+        OffsetRequest request = new OffsetRequest(
+                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+
+        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+        if (offsets.length > 0) {
+            return offsets[0];
+        } else {
+            return NO_OFFSET;
+        }
+    }
+
+    public static class KafkaOffsetMetric implements IMetric {
+        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
+        Set<Partition> _partitions;
+        DynamicPartitionConnections _connections;
+
+        public KafkaOffsetMetric(DynamicPartitionConnections connections) {
+            _connections = connections;
+        }
+
+        public void setLatestEmittedOffset(Partition partition, long offset) {
+            _partitionToOffset.put(partition, offset);
+        }
+
+        private class TopicMetrics {
+            long totalSpoutLag = 0;
+            long totalEarliestTimeOffset = 0;
+            long totalLatestTimeOffset = 0;
+            long totalLatestEmittedOffset = 0;
+        }
+
+        @Override
+        public Object getValueAndReset() {
+            try {
+                HashMap ret = new HashMap();
+                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+                    Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
+                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+                        Partition partition = e.getKey();
+                        SimpleConsumer consumer = _connections.getConnection(partition);
+                        if (consumer == null) {
+                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
+                            return null;
+                        }
+                        long latestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
+                        long earliestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+                        if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
+                            LOG.warn("No data found in Kafka Partition " + partition.getId());
+                            return null;
+                        }
+                        long latestEmittedOffset = e.getValue();
+                        long spoutLag = latestTimeOffset - latestEmittedOffset;
+                        String topic = partition.topic;
+                        String metricPath = partition.getId();
+                        //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition
+                        if (!metricPath.startsWith(topic + "/")) {
+                            metricPath = topic + "/" + metricPath;
+                        }
+                        ret.put(metricPath + "/" + "spoutLag", spoutLag);
+                        ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+                        ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+                        ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+
+                        if (!topicMetricsMap.containsKey(partition.topic)) {
+                            topicMetricsMap.put(partition.topic,new TopicMetrics());
+                        }
+
+                        TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic);
+                        topicMetrics.totalSpoutLag += spoutLag;
+                        topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+                        topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+                        topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+                    }
+
+                    for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+                        String topic = e.getKey();
+                        TopicMetrics topicMetrics = e.getValue();
+                        ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+                        ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+                        ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+                        ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+                    }
+
+                    return ret;
+                } else {
+                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
+                }
+            } catch (Throwable t) {
+                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
+            }
+            return null;
+        }
+
+        public void refreshPartitions(Set<Partition> partitions) {
+            _partitions = partitions;
+            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+            while (it.hasNext()) {
+                if (!partitions.contains(it.next())) {
+                    it.remove();
+                }
+            }
+        }
+    }
+
+    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
+            throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {
+        ByteBufferMessageSet msgs = null;
+        String topic = partition.topic;
+        int partitionId = partition.partition;
+        FetchRequestBuilder builder = new FetchRequestBuilder();
+        FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
+                clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+        FetchResponse fetchResponse;
+        try {
+            fetchResponse = consumer.fetch(fetchRequest);
+        } catch (Exception e) {
+            if (e instanceof ConnectException ||
+                    e instanceof SocketTimeoutException ||
+                    e instanceof IOException ||
+                    e instanceof UnresolvedAddressException
+                    ) {
+                LOG.warn("Network error when fetching messages:", e);
+                throw new FailedFetchException(e);
+            } else {
+                throw new RuntimeException(e);
+            }
+        }
+        if (fetchResponse.hasError()) {
+            KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
+            if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
+                String msg = partition + " Got fetch request with offset out of range: [" + offset + "]";
+                LOG.warn(msg);
+                throw new TopicOffsetOutOfRangeException(msg);
+            } else {
+                String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
+                LOG.error(message);
+                throw new FailedFetchException(message);
+            }
+        } else {
+            msgs = fetchResponse.messageSet(topic, partitionId);
+        }
+        return msgs;
+    }
+
+
+    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
+        Iterable<List<Object>> tups;
+        ByteBuffer payload = msg.payload();
+        if (payload == null) {
+            return null;
+        }
+        ByteBuffer key = msg.key();
+        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
+            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
+        } else {
+            if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
+                tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload);
+            } else {
+                tups = kafkaConfig.scheme.deserialize(payload);
+            }
+        }
+        return tups;
+    }
+    
+    public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
+        ByteBuffer payload = msg.payload();
+        if (payload == null) {
+            return null;
+        }
+        return scheme.deserializeMessageWithMetadata(payload, partition, offset);
+    }
+
+
+    public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
+        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
+        List<Partition> taskPartitions = new ArrayList<Partition>();
+        List<Partition> partitions = new ArrayList<Partition>();
+        for(GlobalPartitionInformation partitionInformation : partitons) {
+            partitions.addAll(partitionInformation.getOrderedPartitions());
+        }
+        int numPartitions = partitions.size();
+        if (numPartitions < totalTasks) {
+            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+        }
+        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+            Partition taskPartition = partitions.get(i);
+            taskPartitions.add(taskPartition);
+        }
+        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+        return taskPartitions;
+    }
+
+    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
+        String taskPrefix = taskId(taskIndex, totalTasks);
+        if (taskPartitions.isEmpty()) {
+            LOG.warn(taskPrefix + "no partitions assigned");
+        } else {
+            LOG.info(taskPrefix + "assigned " + taskPartitions);
+        }
+    }
+
+    public static String taskId(int taskIndex, int totalTasks) {
+        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
new file mode 100644
index 0000000..3f9acc2
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+    List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
new file mode 100644
index 0000000..25053dd
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
+
+    public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
+        super(scheme);
+    }
+
+    public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
+        List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
+        if(o == null) return null;
+        else return Arrays.asList(o);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
new file mode 100644
index 0000000..d0fc08e
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface MessageMetadataScheme extends Scheme {
+    List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
new file mode 100644
index 0000000..a53fa88
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+
+public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
+    private static final long serialVersionUID = -7172403703813625116L;
+
+    public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) {
+        super(scheme);
+    }
+
+    public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
+        List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
+        if (o == null) {
+            return null;
+        } else {
+            return Arrays.asList(o);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
new file mode 100644
index 0000000..afdf8af
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import com.google.common.base.Objects;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+
+public class Partition implements ISpoutPartition {
+
+    public Broker host;
+    public int partition;
+    public String topic;
+
+    //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
+    private Boolean bUseTopicNameForPartitionPathId;
+
+    // for kryo compatibility
+    private Partition() {
+	
+    }
+    public Partition(Broker host, String topic, int partition) {
+        this.topic = topic;
+        this.host = host;
+        this.partition = partition;
+        this.bUseTopicNameForPartitionPathId = false;
+    }
+    
+    public Partition(Broker host, String topic, int partition,Boolean bUseTopicNameForPartitionPathId) {
+        this.topic = topic;
+        this.host = host;
+        this.partition = partition;
+        this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hashCode(host, topic, partition);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+        if (this == obj) {
+            return true;
+        }
+        if (obj == null || getClass() != obj.getClass()) {
+            return false;
+        }
+        final Partition other = (Partition) obj;
+        return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, other.partition);
+    }
+
+    @Override
+    public String toString() {
+        return "Partition{" +
+                "host=" + host +
+                ", topic=" + topic +
+                ", partition=" + partition +
+                '}';
+    }
+
+    @Override
+    public String getId() {
+        if (bUseTopicNameForPartitionPathId) {
+            return  topic  + "/partition_" + partition;
+        } else {
+            //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
+            return "partition_" + partition;
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
new file mode 100644
index 0000000..c9004fa
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.List;
+
+public interface PartitionCoordinator {
+    List<PartitionManager> getMyManagedPartitions();
+
+    PartitionManager getManager(Partition partition);
+
+    void refresh();
+}