You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:42 UTC

[43/50] [abbrv] git commit: Add storm-kafka as an external module.

Add storm-kafka as an external module.


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7d1bf2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7d1bf2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7d1bf2a9

Branch: refs/heads/master
Commit: 7d1bf2a923f8416197f81b467d40f959b2f4f997
Parents: 38ea0ca 8fafbad
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 9 10:26:08 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 9 10:26:08 2014 -0400

----------------------------------------------------------------------
 external/storm-kafka/CHANGELOG.md               |  10 +
 external/storm-kafka/LICENSE                    | 191 ++++++++++++++
 external/storm-kafka/README.md                  |  22 ++
 external/storm-kafka/pom.xml                    | 138 ++++++++++
 .../storm-kafka/src/jvm/storm/kafka/Broker.java |  63 +++++
 .../src/jvm/storm/kafka/BrokerHosts.java        |  11 +
 .../jvm/storm/kafka/DynamicBrokersReader.java   | 124 +++++++++
 .../kafka/DynamicPartitionConnections.java      |  77 ++++++
 .../jvm/storm/kafka/FailedFetchException.java   |  12 +
 .../src/jvm/storm/kafka/KafkaConfig.java        |  33 +++
 .../src/jvm/storm/kafka/KafkaError.java         |  30 +++
 .../src/jvm/storm/kafka/KafkaSpout.java         | 173 +++++++++++++
 .../src/jvm/storm/kafka/KafkaUtils.java         | 218 ++++++++++++++++
 .../src/jvm/storm/kafka/KeyValueScheme.java     |  11 +
 .../kafka/KeyValueSchemeAsMultiScheme.java      |  19 ++
 .../src/jvm/storm/kafka/Partition.java          |  47 ++++
 .../jvm/storm/kafka/PartitionCoordinator.java   |   9 +
 .../src/jvm/storm/kafka/PartitionManager.java   | 224 ++++++++++++++++
 .../src/jvm/storm/kafka/SpoutConfig.java        |  19 ++
 .../src/jvm/storm/kafka/StaticCoordinator.java  |  31 +++
 .../src/jvm/storm/kafka/StaticHosts.java        |  21 ++
 .../storm/kafka/StaticPartitionConnections.java |  35 +++
 .../jvm/storm/kafka/StringKeyValueScheme.java   |  20 ++
 .../src/jvm/storm/kafka/StringScheme.java       |  29 +++
 .../src/jvm/storm/kafka/ZkCoordinator.java      |  95 +++++++
 .../src/jvm/storm/kafka/ZkHosts.java            |  22 ++
 .../src/jvm/storm/kafka/ZkState.java            |  99 +++++++
 .../src/jvm/storm/kafka/bolt/KafkaBolt.java     |  72 ++++++
 .../jvm/storm/kafka/trident/Coordinator.java    |  37 +++
 .../storm/kafka/trident/DefaultCoordinator.java |  14 +
 .../trident/GlobalPartitionInformation.java     |  85 ++++++
 .../storm/kafka/trident/IBatchCoordinator.java  |   9 +
 .../jvm/storm/kafka/trident/IBrokerReader.java  |   8 +
 .../src/jvm/storm/kafka/trident/MaxMetric.java  |  23 ++
 .../kafka/trident/OpaqueTridentKafkaSpout.java  |  42 +++
 .../storm/kafka/trident/StaticBrokerReader.java |  19 ++
 .../trident/TransactionalTridentKafkaSpout.java |  41 +++
 .../storm/kafka/trident/TridentKafkaConfig.java |  20 ++
 .../kafka/trident/TridentKafkaEmitter.java      | 256 +++++++++++++++++++
 .../jvm/storm/kafka/trident/ZkBrokerReader.java |  45 ++++
 .../storm/kafka/DynamicBrokersReaderTest.java   | 155 +++++++++++
 .../src/test/storm/kafka/KafkaErrorTest.java    |  39 +++
 .../src/test/storm/kafka/KafkaTestBroker.java   |  53 ++++
 .../src/test/storm/kafka/KafkaUtilsTest.java    | 221 ++++++++++++++++
 .../storm/kafka/StringKeyValueSchemeTest.java   |  38 +++
 .../src/test/storm/kafka/TestUtils.java         |  20 ++
 .../src/test/storm/kafka/ZkCoordinatorTest.java | 130 ++++++++++
 .../test/storm/kafka/bolt/KafkaBoltTest.java    | 171 +++++++++++++
 pom.xml                                         |   1 +
 49 files changed, 3282 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/CHANGELOG.md
----------------------------------------------------------------------
diff --cc external/storm-kafka/CHANGELOG.md
index 0000000,0000000..f876421
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/CHANGELOG.md
@@@ -1,0 -1,0 +1,10 @@@
++## 0.5.0
++* fixed partition assignment for KafkaSpout
++* upgraded to storm 0.9.1
++## 0.4.0
++* added support for reading kafka message keys
++* configurable metrics emit interval
++## 0.3.0
++* updated partition path in zookeeper
++* added error handling for fetch request
++

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/LICENSE
----------------------------------------------------------------------
diff --cc external/storm-kafka/LICENSE
index 0000000,0000000..37ec93a
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/LICENSE
@@@ -1,0 -1,0 +1,191 @@@
++Apache License
++Version 2.0, January 2004
++http://www.apache.org/licenses/
++
++TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++1. Definitions.
++
++"License" shall mean the terms and conditions for use, reproduction, and
++distribution as defined by Sections 1 through 9 of this document.
++
++"Licensor" shall mean the copyright owner or entity authorized by the copyright
++owner that is granting the License.
++
++"Legal Entity" shall mean the union of the acting entity and all other entities
++that control, are controlled by, or are under common control with that entity.
++For the purposes of this definition, "control" means (i) the power, direct or
++indirect, to cause the direction or management of such entity, whether by
++contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
++outstanding shares, or (iii) beneficial ownership of such entity.
++
++"You" (or "Your") shall mean an individual or Legal Entity exercising
++permissions granted by this License.
++
++"Source" form shall mean the preferred form for making modifications, including
++but not limited to software source code, documentation source, and configuration
++files.
++
++"Object" form shall mean any form resulting from mechanical transformation or
++translation of a Source form, including but not limited to compiled object code,
++generated documentation, and conversions to other media types.
++
++"Work" shall mean the work of authorship, whether in Source or Object form, made
++available under the License, as indicated by a copyright notice that is included
++in or attached to the work (an example is provided in the Appendix below).
++
++"Derivative Works" shall mean any work, whether in Source or Object form, that
++is based on (or derived from) the Work and for which the editorial revisions,
++annotations, elaborations, or other modifications represent, as a whole, an
++original work of authorship. For the purposes of this License, Derivative Works
++shall not include works that remain separable from, or merely link (or bind by
++name) to the interfaces of, the Work and Derivative Works thereof.
++
++"Contribution" shall mean any work of authorship, including the original version
++of the Work and any modifications or additions to that Work or Derivative Works
++thereof, that is intentionally submitted to Licensor for inclusion in the Work
++by the copyright owner or by an individual or Legal Entity authorized to submit
++on behalf of the copyright owner. For the purposes of this definition,
++"submitted" means any form of electronic, verbal, or written communication sent
++to the Licensor or its representatives, including but not limited to
++communication on electronic mailing lists, source code control systems, and
++issue tracking systems that are managed by, or on behalf of, the Licensor for
++the purpose of discussing and improving the Work, but excluding communication
++that is conspicuously marked or otherwise designated in writing by the copyright
++owner as "Not a Contribution."
++
++"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
++of whom a Contribution has been received by Licensor and subsequently
++incorporated within the Work.
++
++2. Grant of Copyright License.
++
++Subject to the terms and conditions of this License, each Contributor hereby
++grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
++irrevocable copyright license to reproduce, prepare Derivative Works of,
++publicly display, publicly perform, sublicense, and distribute the Work and such
++Derivative Works in Source or Object form.
++
++3. Grant of Patent License.
++
++Subject to the terms and conditions of this License, each Contributor hereby
++grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
++irrevocable (except as stated in this section) patent license to make, have
++made, use, offer to sell, sell, import, and otherwise transfer the Work, where
++such license applies only to those patent claims licensable by such Contributor
++that are necessarily infringed by their Contribution(s) alone or by combination
++of their Contribution(s) with the Work to which such Contribution(s) was
++submitted. If You institute patent litigation against any entity (including a
++cross-claim or counterclaim in a lawsuit) alleging that the Work or a
++Contribution incorporated within the Work constitutes direct or contributory
++patent infringement, then any patent licenses granted to You under this License
++for that Work shall terminate as of the date such litigation is filed.
++
++4. Redistribution.
++
++You may reproduce and distribute copies of the Work or Derivative Works thereof
++in any medium, with or without modifications, and in Source or Object form,
++provided that You meet the following conditions:
++
++You must give any other recipients of the Work or Derivative Works a copy of
++this License; and
++You must cause any modified files to carry prominent notices stating that You
++changed the files; and
++You must retain, in the Source form of any Derivative Works that You distribute,
++all copyright, patent, trademark, and attribution notices from the Source form
++of the Work, excluding those notices that do not pertain to any part of the
++Derivative Works; and
++If the Work includes a "NOTICE" text file as part of its distribution, then any
++Derivative Works that You distribute must include a readable copy of the
++attribution notices contained within such NOTICE file, excluding those notices
++that do not pertain to any part of the Derivative Works, in at least one of the
++following places: within a NOTICE text file distributed as part of the
++Derivative Works; within the Source form or documentation, if provided along
++with the Derivative Works; or, within a display generated by the Derivative
++Works, if and wherever such third-party notices normally appear. The contents of
++the NOTICE file are for informational purposes only and do not modify the
++License. You may add Your own attribution notices within Derivative Works that
++You distribute, alongside or as an addendum to the NOTICE text from the Work,
++provided that such additional attribution notices cannot be construed as
++modifying the License.
++You may add Your own copyright statement to Your modifications and may provide
++additional or different license terms and conditions for use, reproduction, or
++distribution of Your modifications, or for any such Derivative Works as a whole,
++provided Your use, reproduction, and distribution of the Work otherwise complies
++with the conditions stated in this License.
++
++5. Submission of Contributions.
++
++Unless You explicitly state otherwise, any Contribution intentionally submitted
++for inclusion in the Work by You to the Licensor shall be under the terms and
++conditions of this License, without any additional terms or conditions.
++Notwithstanding the above, nothing herein shall supersede or modify the terms of
++any separate license agreement you may have executed with Licensor regarding
++such Contributions.
++
++6. Trademarks.
++
++This License does not grant permission to use the trade names, trademarks,
++service marks, or product names of the Licensor, except as required for
++reasonable and customary use in describing the origin of the Work and
++reproducing the content of the NOTICE file.
++
++7. Disclaimer of Warranty.
++
++Unless required by applicable law or agreed to in writing, Licensor provides the
++Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
++WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
++including, without limitation, any warranties or conditions of TITLE,
++NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
++solely responsible for determining the appropriateness of using or
++redistributing the Work and assume any risks associated with Your exercise of
++permissions under this License.
++
++8. Limitation of Liability.
++
++In no event and under no legal theory, whether in tort (including negligence),
++contract, or otherwise, unless required by applicable law (such as deliberate
++and grossly negligent acts) or agreed to in writing, shall any Contributor be
++liable to You for damages, including any direct, indirect, special, incidental,
++or consequential damages of any character arising as a result of this License or
++out of the use or inability to use the Work (including but not limited to
++damages for loss of goodwill, work stoppage, computer failure or malfunction, or
++any and all other commercial damages or losses), even if such Contributor has
++been advised of the possibility of such damages.
++
++9. Accepting Warranty or Additional Liability.
++
++While redistributing the Work or Derivative Works thereof, You may choose to
++offer, and charge a fee for, acceptance of support, warranty, indemnity, or
++other liability obligations and/or rights consistent with this License. However,
++in accepting such obligations, You may act only on Your own behalf and on Your
++sole responsibility, not on behalf of any other Contributor, and only if You
++agree to indemnify, defend, and hold each Contributor harmless for any liability
++incurred by, or claims asserted against, such Contributor by reason of your
++accepting any such warranty or additional liability.
++
++END OF TERMS AND CONDITIONS
++
++APPENDIX: How to apply the Apache License to your work
++
++To apply the Apache License to your work, attach the following boilerplate
++notice, with the fields enclosed by brackets "[]" replaced with your own
++identifying information. (Don't include the brackets!) The text should be
++enclosed in the appropriate comment syntax for the file format. We also
++recommend that a file or class name and description of purpose be included on
++the same "printed page" as the copyright notice for easier identification within
++third-party archives.
++
++   Copyright [yyyy] [name of copyright owner]
++
++   Licensed under the Apache License, Version 2.0 (the "License");
++   you may not use this file except in compliance with the License.
++   You may obtain a copy of the License at
++
++     http://www.apache.org/licenses/LICENSE-2.0
++
++   Unless required by applicable law or agreed to in writing, software
++   distributed under the License is distributed on an "AS IS" BASIS,
++   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++   See the License for the specific language governing permissions and
++   limitations under the License.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --cc external/storm-kafka/README.md
index 0000000,0000000..874db01
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/README.md
@@@ -1,0 -1,0 +1,22 @@@
++storm-kafka-0.8-plus
++====================
++
++Port of storm-kafka to support kafka >= 0.8
++
++##Usage:
++For information on how to use this library in your project see:
++
++[https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus](https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus)
++
++
++##Example Topologies:
++
++[https://github.com/wurstmeister/storm-kafka-0.8-plus-test](https://github.com/wurstmeister/storm-kafka-0.8-plus-test)
++
++##Acknowledgement:
++
++YourKit is kindly supporting this open source project with its full-featured Java Profiler.
++YourKit, LLC is the creator of innovative and intelligent tools for profiling
++Java and .NET applications. Take a look at YourKit's leading software products:
++<a href="http://www.yourkit.com/java/profiler/index.jsp">YourKit Java Profiler</a> and
++<a href="http://www.yourkit.com/.net/profiler/index.jsp">YourKit .NET Profiler</a>.

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka/pom.xml
index 0000000,0000000..15743b6
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/pom.xml
@@@ -1,0 -1,0 +1,138 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements.  See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License.  You may obtain a copy of the License at
++
++     http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
++         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++    <modelVersion>4.0.0</modelVersion>
++	
++    <parent>
++        <artifactId>storm</artifactId>
++        <groupId>org.apache.storm</groupId>
++        <version>0.9.2-incubating-SNAPSHOT</version>
++        <relativePath>../../pom.xml</relativePath>
++    </parent>
++	
++    <packaging>jar</packaging>
++	<artifactId>storm-kafka</artifactId>
++    <name>storm-kafka</name>
++    <description>Storm Spouts for Apache Kafka</description>
++    <properties>
++        <scalaVersion>2.9.2</scalaVersion>
++        <kafkaArtifact>kafka_2.9.2</kafkaArtifact>
++        <envClassifier></envClassifier>
++    </properties>
++    <build>
++        <plugins>
++
++        </plugins>
++        <sourceDirectory>src/jvm</sourceDirectory>
++        <testSourceDirectory>src/test</testSourceDirectory>
++    </build>
++    <dependencies>
++        <dependency>
++            <groupId>org.mockito</groupId>
++            <artifactId>mockito-all</artifactId>
++            <version>1.9.0</version>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>org.scala-lang</groupId>
++            <artifactId>scala-library</artifactId>
++            <version>${scalaVersion}</version>
++        </dependency>
++        <dependency>
++            <groupId>junit</groupId>
++            <artifactId>junit</artifactId>
++            <version>4.11</version>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>com.netflix.curator</groupId>
++            <artifactId>curator-framework</artifactId>
++            <version>1.3.3</version>
++            <exclusions>
++                <exclusion>
++                    <groupId>log4j</groupId>
++                    <artifactId>log4j</artifactId>
++                </exclusion>
++                <exclusion>
++                    <groupId>org.slf4j</groupId>
++                    <artifactId>slf4j-log4j12</artifactId>
++                </exclusion>
++            </exclusions>
++        </dependency>
++        <dependency>
++            <groupId>com.netflix.curator</groupId>
++            <artifactId>curator-recipes</artifactId>
++            <version>1.3.3</version>
++            <exclusions>
++                <exclusion>
++                    <groupId>log4j</groupId>
++                    <artifactId>log4j</artifactId>
++                </exclusion>
++            </exclusions>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>com.netflix.curator</groupId>
++            <artifactId>curator-test</artifactId>
++            <version>1.3.3</version>
++            <exclusions>
++                <exclusion>
++                    <groupId>log4j</groupId>
++                    <artifactId>log4j</artifactId>
++                </exclusion>
++                <exclusion>
++                    <groupId>org.testng</groupId>
++                    <artifactId>testng</artifactId>
++                </exclusion>
++            </exclusions>
++            <scope>test</scope>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.kafka</groupId>
++            <artifactId>${kafkaArtifact}</artifactId>
++            <version>0.8.0</version>
++            <exclusions>
++                <exclusion>
++                    <groupId>org.apache.zookeeper</groupId>
++                    <artifactId>zookeeper</artifactId>
++                </exclusion>
++                <exclusion>
++                    <groupId>log4j</groupId>
++                    <artifactId>log4j</artifactId>
++                </exclusion>
++            </exclusions>
++        </dependency>
++        <dependency>
++            <groupId>org.apache.storm</groupId>
++            <artifactId>storm-core</artifactId>
++            <version>${project.version}</version>
++            <scope>provided</scope>
++        </dependency>
++    </dependencies>
++    <profiles>
++        <profile>
++            <id>Scala-2.10</id>
++            <properties>
++                <scalaVersion>2.10.3</scalaVersion>
++                <kafkaArtifact>kafka_2.10</kafkaArtifact>
++                <envClassifier>scala_2.10</envClassifier>
++            </properties>
++        </profile>
++    </profiles>
++</project>

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/Broker.java
index 0000000,0000000..2451eee
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
@@@ -1,0 -1,0 +1,63 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++import com.google.common.base.Objects;
++
++public class Broker implements Serializable, Comparable<Broker> {
++    public final String host;
++    public final int port;
++
++    public Broker(String host, int port) {
++        this.host = host;
++        this.port = port;
++    }
++
++    public Broker(String host) {
++        this(host, 9092);
++    }
++
++    @Override
++    public int hashCode() {
++        return Objects.hashCode(host, port);
++    }
++
++    @Override
++    public boolean equals(Object obj) {
++        if (this == obj) {
++            return true;
++        }
++        if (obj == null || getClass() != obj.getClass()) {
++            return false;
++        }
++        final Broker other = (Broker) obj;
++        return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
++    }
++
++    @Override
++    public String toString() {
++        return host + ":" + port;
++    }
++
++    public static Broker fromString(String host) {
++        Broker hp;
++        String[] spec = host.split(":");
++        if (spec.length == 1) {
++            hp = new Broker(spec[0]);
++        } else if (spec.length == 2) {
++            hp = new Broker(spec[0], Integer.parseInt(spec[1]));
++        } else {
++            throw new IllegalArgumentException("Invalid host specification: " + host);
++        }
++        return hp;
++    }
++
++
++    @Override
++    public int compareTo(Broker o) {
++        if (this.host.equals(o.host)) {
++            return this.port - o.port;
++        } else {
++            return this.host.compareTo(o.host);
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
index 0000000,0000000..12ef7b1
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
@@@ -1,0 -1,0 +1,11 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:40
++ */
++public interface BrokerHosts extends Serializable {
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index 0000000,0000000..cd751fe
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@@ -1,0 -1,0 +1,124 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.utils.Utils;
++import com.netflix.curator.framework.CuratorFramework;
++import com.netflix.curator.framework.CuratorFrameworkFactory;
++import com.netflix.curator.retry.RetryNTimes;
++import org.json.simple.JSONValue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++
++import java.io.IOException;
++import java.io.UnsupportedEncodingException;
++import java.util.List;
++import java.util.Map;
++
++public class DynamicBrokersReader {
++
++    public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
++
++    private CuratorFramework _curator;
++    private String _zkPath;
++    private String _topic;
++
++    public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
++        _zkPath = zkPath;
++        _topic = topic;
++        _curator = CuratorFrameworkFactory.newClient(
++                zkStr,
++                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
++                15000,
++                new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
++                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
++        _curator.start();
++    }
++
++    /**
++     * Get all partitions with their current leaders
++     */
++    public GlobalPartitionInformation getBrokerInfo() {
++        GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
++        try {
++            int numPartitionsForTopic = getNumPartitions();
++            String brokerInfoPath = brokerPath();
++            for (int partition = 0; partition < numPartitionsForTopic; partition++) {
++                int leader = getLeaderFor(partition);
++                String path = brokerInfoPath + "/" + leader;
++                try {
++                    byte[] brokerData = _curator.getData().forPath(path);
++                    Broker hp = getBrokerHost(brokerData);
++                    globalPartitionInformation.addPartition(partition, hp);
++                } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
++                    LOG.error("Node {} does not exist ", path);
++                }
++            }
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++        LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
++        return globalPartitionInformation;
++    }
++
++
++    private int getNumPartitions() {
++        try {
++            String topicBrokersPath = partitionPath();
++            List<String> children = _curator.getChildren().forPath(topicBrokersPath);
++            return children.size();
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public String partitionPath() {
++        return _zkPath + "/topics/" + _topic + "/partitions";
++    }
++
++    public String brokerPath() {
++        return _zkPath + "/ids";
++    }
++
++    /**
++     * get /brokers/topics/distributedTopic/partitions/1/state
++     * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
++     *
++     * @param partition
++     * @return
++     */
++    private int getLeaderFor(long partition) {
++        try {
++            String topicBrokersPath = partitionPath();
++            byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
++            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
++            Integer leader = ((Number) value.get("leader")).intValue();
++            return leader;
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public void close() {
++        _curator.close();
++    }
++
++    /**
++     * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
++     * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
++     *
++     * @param contents
++     * @return
++     */
++    private Broker getBrokerHost(byte[] contents) {
++        try {
++            Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
++            String host = (String) value.get("host");
++            Integer port = ((Long) value.get("port")).intValue();
++            return new Broker(host, port);
++        } catch (UnsupportedEncodingException e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 0000000,0000000..8d0115b
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@@ -1,0 -1,0 +1,77 @@@
++package storm.kafka;
++
++import kafka.javaapi.consumer.SimpleConsumer;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.IBrokerReader;
++
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++
++
++public class DynamicPartitionConnections {
++
++    public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
++
++    static class ConnectionInfo {
++        SimpleConsumer consumer;
++        Set<Integer> partitions = new HashSet();
++
++        public ConnectionInfo(SimpleConsumer consumer) {
++            this.consumer = consumer;
++        }
++    }
++
++    Map<Broker, ConnectionInfo> _connections = new HashMap();
++    KafkaConfig _config;
++    IBrokerReader _reader;
++
++    public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
++        _config = config;
++        _reader = brokerReader;
++    }
++
++    public SimpleConsumer register(Partition partition) {
++        Broker broker = _reader.getCurrentBrokers().getBrokerFor(partition.partition);
++        return register(broker, partition.partition);
++    }
++
++    public SimpleConsumer register(Broker host, int partition) {
++        if (!_connections.containsKey(host)) {
++            _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
++        }
++        ConnectionInfo info = _connections.get(host);
++        info.partitions.add(partition);
++        return info.consumer;
++    }
++
++    public SimpleConsumer getConnection(Partition partition) {
++        ConnectionInfo info = _connections.get(partition.host);
++        if (info != null) {
++            return info.consumer;
++        }
++        return null;
++    }
++
++    public void unregister(Broker port, int partition) {
++        ConnectionInfo info = _connections.get(port);
++        info.partitions.remove(partition);
++        if (info.partitions.isEmpty()) {
++            info.consumer.close();
++            _connections.remove(port);
++        }
++    }
++
++    public void unregister(Partition partition) {
++        unregister(partition.host, partition.partition);
++    }
++
++    public void clear() {
++        for (ConnectionInfo info : _connections.values()) {
++            info.consumer.close();
++        }
++        _connections.clear();
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
index 0000000,0000000..0bd1123
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
@@@ -1,0 -1,0 +1,12 @@@
++package storm.kafka;
++
++public class FailedFetchException extends RuntimeException {
++
++    public FailedFetchException(String message) {
++        super(message);
++    }
++
++    public FailedFetchException(Exception e) {
++        super(e);
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 0000000,0000000..8ef2a88
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@@ -1,0 -1,0 +1,33 @@@
++package storm.kafka;
++
++import backtype.storm.spout.MultiScheme;
++import backtype.storm.spout.RawMultiScheme;
++
++import java.io.Serializable;
++
++public class KafkaConfig implements Serializable {
++
++    public final BrokerHosts hosts;
++    public final String topic;
++    public final String clientId;
++
++    public int fetchSizeBytes = 1024 * 1024;
++    public int socketTimeoutMs = 10000;
++    public int bufferSizeBytes = 1024 * 1024;
++    public MultiScheme scheme = new RawMultiScheme();
++    public boolean forceFromStart = false;
++    public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
++    public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
++    public int metricsTimeBucketSizeInSecs = 60;
++
++    public KafkaConfig(BrokerHosts hosts, String topic) {
++        this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
++    }
++
++    public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
++        this.hosts = hosts;
++        this.topic = topic;
++        this.clientId = clientId;
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
index 0000000,0000000..a67335c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
@@@ -1,0 -1,0 +1,30 @@@
++package storm.kafka;
++
++/**
++ * Date: 11/01/2014
++ * Time: 14:21
++ */
++public enum KafkaError {
++    NO_ERROR,
++    OFFSET_OUT_OF_RANGE,
++    INVALID_MESSAGE,
++    UNKNOWN_TOPIC_OR_PARTITION,
++    INVALID_FETCH_SIZE,
++    LEADER_NOT_AVAILABLE,
++    NOT_LEADER_FOR_PARTITION,
++    REQUEST_TIMED_OUT,
++    BROKER_NOT_AVAILABLE,
++    REPLICA_NOT_AVAILABLE,
++    MESSAGE_SIZE_TOO_LARGE,
++    STALE_CONTROLLER_EPOCH,
++    OFFSET_METADATA_TOO_LARGE,
++    UNKNOWN;
++
++    public static KafkaError getError(int errorCode) {
++        if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
++            return UNKNOWN;
++        } else {
++            return values()[errorCode];
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 0000000,0000000..79e33fe
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@@ -1,0 -1,0 +1,173 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.metric.api.IMetric;
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import kafka.message.Message;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.PartitionManager.KafkaMessageId;
++
++import java.util.*;
++
++// TODO: need to add blacklisting
++// TODO: need to make a best effort to not re-emit messages if don't have to
++public class KafkaSpout extends BaseRichSpout {
++    public static class MessageAndRealOffset {
++        public Message msg;
++        public long offset;
++
++        public MessageAndRealOffset(Message msg, long offset) {
++            this.msg = msg;
++            this.offset = offset;
++        }
++    }
++
++    static enum EmitState {
++        EMITTED_MORE_LEFT,
++        EMITTED_END,
++        NO_EMITTED
++    }
++
++    public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
++
++    String _uuid = UUID.randomUUID().toString();
++    SpoutConfig _spoutConfig;
++    SpoutOutputCollector _collector;
++    PartitionCoordinator _coordinator;
++    DynamicPartitionConnections _connections;
++    ZkState _state;
++
++    long _lastUpdateMs = 0;
++
++    int _currPartitionIndex = 0;
++
++    public KafkaSpout(SpoutConfig spoutConf) {
++        _spoutConfig = spoutConf;
++    }
++
++    @Override
++    public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
++        _collector = collector;
++
++        Map stateConf = new HashMap(conf);
++        List<String> zkServers = _spoutConfig.zkServers;
++        if (zkServers == null) {
++            zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
++        }
++        Integer zkPort = _spoutConfig.zkPort;
++        if (zkPort == null) {
++            zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
++        }
++        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
++        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
++        stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
++        _state = new ZkState(stateConf);
++
++        _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
++
++        // using TransactionalState like this is a hack
++        int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
++        if (_spoutConfig.hosts instanceof StaticHosts) {
++            _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
++        } else {
++            _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
++        }
++
++        context.registerMetric("kafkaOffset", new IMetric() {
++            KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
++
++            @Override
++            public Object getValueAndReset() {
++                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
++                Set<Partition> latestPartitions = new HashSet();
++                for (PartitionManager pm : pms) {
++                    latestPartitions.add(pm.getPartition());
++                }
++                _kafkaOffsetMetric.refreshPartitions(latestPartitions);
++                for (PartitionManager pm : pms) {
++                    _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
++                }
++                return _kafkaOffsetMetric.getValueAndReset();
++            }
++        }, _spoutConfig.metricsTimeBucketSizeInSecs);
++
++        context.registerMetric("kafkaPartition", new IMetric() {
++            @Override
++            public Object getValueAndReset() {
++                List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
++                Map concatMetricsDataMaps = new HashMap();
++                for (PartitionManager pm : pms) {
++                    concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
++                }
++                return concatMetricsDataMaps;
++            }
++        }, _spoutConfig.metricsTimeBucketSizeInSecs);
++    }
++
++    @Override
++    public void close() {
++        _state.close();
++    }
++
++    @Override
++    public void nextTuple() {
++        List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
++        for (int i = 0; i < managers.size(); i++) {
++
++            // in case the number of managers decreased
++            _currPartitionIndex = _currPartitionIndex % managers.size();
++            EmitState state = managers.get(_currPartitionIndex).next(_collector);
++            if (state != EmitState.EMITTED_MORE_LEFT) {
++                _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
++            }
++            if (state != EmitState.NO_EMITTED) {
++                break;
++            }
++        }
++
++        long now = System.currentTimeMillis();
++        if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
++            commit();
++        }
++    }
++
++    @Override
++    public void ack(Object msgId) {
++        KafkaMessageId id = (KafkaMessageId) msgId;
++        PartitionManager m = _coordinator.getManager(id.partition);
++        if (m != null) {
++            m.ack(id.offset);
++        }
++    }
++
++    @Override
++    public void fail(Object msgId) {
++        KafkaMessageId id = (KafkaMessageId) msgId;
++        PartitionManager m = _coordinator.getManager(id.partition);
++        if (m != null) {
++            m.fail(id.offset);
++        }
++    }
++
++    @Override
++    public void deactivate() {
++        commit();
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++        declarer.declare(_spoutConfig.scheme.getOutputFields());
++    }
++
++    private void commit() {
++        _lastUpdateMs = System.currentTimeMillis();
++        for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
++            manager.commit();
++        }
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0000000,0000000..0e7f601
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@@ -1,0 -1,0 +1,218 @@@
++package storm.kafka;
++
++import backtype.storm.metric.api.IMetric;
++import backtype.storm.utils.Utils;
++import com.google.common.base.Preconditions;
++import kafka.api.FetchRequest;
++import kafka.api.FetchRequestBuilder;
++import kafka.api.PartitionOffsetRequestInfo;
++import kafka.common.TopicAndPartition;
++import kafka.javaapi.FetchResponse;
++import kafka.javaapi.OffsetRequest;
++import kafka.javaapi.consumer.SimpleConsumer;
++import kafka.javaapi.message.ByteBufferMessageSet;
++import kafka.message.Message;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++import storm.kafka.trident.IBrokerReader;
++import storm.kafka.trident.StaticBrokerReader;
++import storm.kafka.trident.ZkBrokerReader;
++
++import java.net.ConnectException;
++import java.nio.ByteBuffer;
++import java.util.*;
++
++
++public class KafkaUtils {
++
++    public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
++    private static final int NO_OFFSET = -5;
++
++
++    public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
++        if (conf.hosts instanceof StaticHosts) {
++            return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
++        } else {
++            return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
++        }
++    }
++
++
++    public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
++        long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
++        if (config.forceFromStart) {
++            startOffsetTime = config.startOffsetTime;
++        }
++        return getOffset(consumer, topic, partition, startOffsetTime);
++    }
++
++    public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
++        TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
++        Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
++        requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
++        OffsetRequest request = new OffsetRequest(
++                requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
++
++        long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
++        if (offsets.length > 0) {
++            return offsets[0];
++        } else {
++            return NO_OFFSET;
++        }
++    }
++
++    public static class KafkaOffsetMetric implements IMetric {
++        Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
++        Set<Partition> _partitions;
++        String _topic;
++        DynamicPartitionConnections _connections;
++
++        public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
++            _topic = topic;
++            _connections = connections;
++        }
++
++        public void setLatestEmittedOffset(Partition partition, long offset) {
++            _partitionToOffset.put(partition, offset);
++        }
++
++        @Override
++        public Object getValueAndReset() {
++            try {
++                long totalSpoutLag = 0;
++                long totalEarliestTimeOffset = 0;
++                long totalLatestTimeOffset = 0;
++                long totalLatestEmittedOffset = 0;
++                HashMap ret = new HashMap();
++                if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
++                    for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
++                        Partition partition = e.getKey();
++                        SimpleConsumer consumer = _connections.getConnection(partition);
++                        if (consumer == null) {
++                            LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
++                            return null;
++                        }
++                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
++                        long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
++                        if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
++                            LOG.warn("No data found in Kafka Partition " + partition.getId());
++                            return null;
++                        }
++                        long latestEmittedOffset = e.getValue();
++                        long spoutLag = latestTimeOffset - latestEmittedOffset;
++                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
++                        ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
++                        ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
++                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
++                        totalSpoutLag += spoutLag;
++                        totalEarliestTimeOffset += earliestTimeOffset;
++                        totalLatestTimeOffset += latestTimeOffset;
++                        totalLatestEmittedOffset += latestEmittedOffset;
++                    }
++                    ret.put("totalSpoutLag", totalSpoutLag);
++                    ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
++                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
++                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
++                    return ret;
++                } else {
++                    LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
++                }
++            } catch (Throwable t) {
++                LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
++            }
++            return null;
++        }
++
++        public void refreshPartitions(Set<Partition> partitions) {
++            _partitions = partitions;
++            Iterator<Partition> it = _partitionToOffset.keySet().iterator();
++            while (it.hasNext()) {
++                if (!partitions.contains(it.next())) {
++                    it.remove();
++                }
++            }
++        }
++    }
++
++    public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
++        ByteBufferMessageSet msgs = null;
++        String topic = config.topic;
++        int partitionId = partition.partition;
++        for (int errors = 0; errors < 2 && msgs == null; errors++) {
++            FetchRequestBuilder builder = new FetchRequestBuilder();
++            FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
++                    clientId(config.clientId).build();
++            FetchResponse fetchResponse;
++            try {
++                fetchResponse = consumer.fetch(fetchRequest);
++            } catch (Exception e) {
++                if (e instanceof ConnectException) {
++                    throw new FailedFetchException(e);
++                } else {
++                    throw new RuntimeException(e);
++                }
++            }
++            if (fetchResponse.hasError()) {
++                KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
++                if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
++                    long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
++                    LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
++                            "retrying with default start offset time from configuration. " +
++                            "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
++                    offset = startOffset;
++                } else {
++                    String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
++                    LOG.error(message);
++                    throw new FailedFetchException(message);
++                }
++            } else {
++                msgs = fetchResponse.messageSet(topic, partitionId);
++            }
++        }
++        return msgs;
++    }
++
++
++    public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
++        Iterable<List<Object>> tups;
++        ByteBuffer payload = msg.payload();
++        ByteBuffer key = msg.key();
++        if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
++            tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
++        } else {
++            tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
++        }
++        return tups;
++    }
++
++
++    public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
++        Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
++        List<Partition> partitions = partitionInformation.getOrderedPartitions();
++        int numPartitions = partitions.size();
++        if (numPartitions < totalTasks) {
++            LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
++        }
++        List<Partition> taskPartitions = new ArrayList<Partition>();
++        for (int i = taskIndex; i < numPartitions; i += totalTasks) {
++            Partition taskPartition = partitions.get(i);
++            taskPartitions.add(taskPartition);
++        }
++        logPartitionMapping(totalTasks, taskIndex, taskPartitions);
++        return taskPartitions;
++    }
++
++    private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
++        String taskPrefix = taskId(taskIndex, totalTasks);
++        if (taskPartitions.isEmpty()) {
++            LOG.warn(taskPrefix + "no partitions assigned");
++        } else {
++            LOG.info(taskPrefix + "assigned " + taskPartitions);
++        }
++    }
++
++    public static String taskId(int taskIndex, int totalTasks) {
++        return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
index 0000000,0000000..df31cb8
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
@@@ -1,0 -1,0 +1,11 @@@
++package storm.kafka;
++
++import backtype.storm.spout.Scheme;
++
++import java.util.List;
++
++public interface KeyValueScheme extends Scheme {
++
++    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index 0000000,0000000..2412a1c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@@ -1,0 -1,0 +1,19 @@@
++package storm.kafka;
++
++import backtype.storm.spout.SchemeAsMultiScheme;
++import java.util.Arrays;
++import java.util.List;
++
++public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
++
++    public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
++        super(scheme);
++    }
++
++    public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
++        List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
++        if(o == null) return null;
++        else return Arrays.asList(o);
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/Partition.java
index 0000000,0000000..96a3ad7
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/Partition.java
@@@ -1,0 -1,0 +1,47 @@@
++package storm.kafka;
++
++import com.google.common.base.Objects;
++import storm.trident.spout.ISpoutPartition;
++
++
++public class Partition implements ISpoutPartition {
++
++    public final Broker host;
++    public final int partition;
++
++    public Partition(Broker host, int partition) {
++        this.host = host;
++        this.partition = partition;
++    }
++
++    @Override
++    public int hashCode() {
++        return Objects.hashCode(host, partition);
++    }
++
++    @Override
++    public boolean equals(Object obj) {
++        if (this == obj) {
++            return true;
++        }
++        if (obj == null || getClass() != obj.getClass()) {
++            return false;
++        }
++        final Partition other = (Partition) obj;
++        return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
++    }
++
++    @Override
++    public String toString() {
++        return "Partition{" +
++                "host=" + host +
++                ", partition=" + partition +
++                '}';
++    }
++
++    @Override
++    public String getId() {
++        return "partition_" + partition;
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
index 0000000,0000000..d28248d
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
@@@ -1,0 -1,0 +1,9 @@@
++package storm.kafka;
++
++import java.util.List;
++
++public interface PartitionCoordinator {
++    List<PartitionManager> getMyManagedPartitions();
++
++    PartitionManager getManager(Partition partition);
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 0000000,0000000..03075bb
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@@ -1,0 -1,0 +1,224 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.metric.api.CombinedMetric;
++import backtype.storm.metric.api.CountMetric;
++import backtype.storm.metric.api.MeanReducer;
++import backtype.storm.metric.api.ReducedMetric;
++import backtype.storm.spout.SpoutOutputCollector;
++import com.google.common.collect.ImmutableMap;
++import kafka.api.OffsetRequest;
++import kafka.javaapi.consumer.SimpleConsumer;
++import kafka.javaapi.message.ByteBufferMessageSet;
++import kafka.message.MessageAndOffset;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.KafkaSpout.EmitState;
++import storm.kafka.KafkaSpout.MessageAndRealOffset;
++import storm.kafka.trident.MaxMetric;
++
++import java.util.*;
++
++public class PartitionManager {
++    public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
++    private final CombinedMetric _fetchAPILatencyMax;
++    private final ReducedMetric _fetchAPILatencyMean;
++    private final CountMetric _fetchAPICallCount;
++    private final CountMetric _fetchAPIMessageCount;
++
++    static class KafkaMessageId {
++        public Partition partition;
++        public long offset;
++
++        public KafkaMessageId(Partition partition, long offset) {
++            this.partition = partition;
++            this.offset = offset;
++        }
++    }
++
++    Long _emittedToOffset;
++    SortedSet<Long> _pending = new TreeSet<Long>();
++    Long _committedTo;
++    LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
++    Partition _partition;
++    SpoutConfig _spoutConfig;
++    String _topologyInstanceId;
++    SimpleConsumer _consumer;
++    DynamicPartitionConnections _connections;
++    ZkState _state;
++    Map _stormConf;
++
++
++    public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
++        _partition = id;
++        _connections = connections;
++        _spoutConfig = spoutConfig;
++        _topologyInstanceId = topologyInstanceId;
++        _consumer = connections.register(id.host, id.partition);
++        _state = state;
++        _stormConf = stormConf;
++
++        String jsonTopologyId = null;
++        Long jsonOffset = null;
++        String path = committedPath();
++        try {
++            Map<Object, Object> json = _state.readJSON(path);
++            LOG.info("Read partition information from: " + path +  " --> " + json );
++            if (json != null) {
++                jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
++                jsonOffset = (Long) json.get("offset");
++            }
++        } catch (Throwable e) {
++            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
++        }
++
++        if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
++            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
++            LOG.info("No partition information found, using configuration to determine offset");
++        } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
++            _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
++            LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
++        } else {
++            _committedTo = jsonOffset;
++            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
++        }
++
++        LOG.info("Starting " + _partition + " from offset " + _committedTo);
++        _emittedToOffset = _committedTo;
++
++        _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
++        _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
++        _fetchAPICallCount = new CountMetric();
++        _fetchAPIMessageCount = new CountMetric();
++    }
++
++    public Map getMetricsDataMap() {
++        Map ret = new HashMap();
++        ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
++        ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
++        ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
++        ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
++        return ret;
++    }
++
++    //returns false if it's reached the end of current batch
++    public EmitState next(SpoutOutputCollector collector) {
++        if (_waitingToEmit.isEmpty()) {
++            fill();
++        }
++        while (true) {
++            MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
++            if (toEmit == null) {
++                return EmitState.NO_EMITTED;
++            }
++            Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
++            if (tups != null) {
++                for (List<Object> tup : tups) {
++                    collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
++                }
++                break;
++            } else {
++                ack(toEmit.offset);
++            }
++        }
++        if (!_waitingToEmit.isEmpty()) {
++            return EmitState.EMITTED_MORE_LEFT;
++        } else {
++            return EmitState.EMITTED_END;
++        }
++    }
++
++    private void fill() {
++        long start = System.nanoTime();
++        ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
++        long end = System.nanoTime();
++        long millis = (end - start) / 1000000;
++        _fetchAPILatencyMax.update(millis);
++        _fetchAPILatencyMean.update(millis);
++        _fetchAPICallCount.incr();
++        int numMessages = countMessages(msgs);
++        _fetchAPIMessageCount.incrBy(numMessages);
++
++        if (numMessages > 0) {
++            LOG.info("Fetched " + numMessages + " messages from: " + _partition);
++        }
++        for (MessageAndOffset msg : msgs) {
++            _pending.add(_emittedToOffset);
++            _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
++            _emittedToOffset = msg.nextOffset();
++        }
++        if (numMessages > 0) {
++            LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
++        }
++    }
++
++    private int countMessages(ByteBufferMessageSet messageSet) {
++        int counter = 0;
++        for (MessageAndOffset messageAndOffset : messageSet) {
++            counter = counter + 1;
++        }
++        return counter;
++    }
++
++    public void ack(Long offset) {
++        _pending.remove(offset);
++    }
++
++    public void fail(Long offset) {
++        //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
++        // things might get crazy with lots of timeouts
++        if (_emittedToOffset > offset) {
++            _emittedToOffset = offset;
++            _pending.tailSet(offset).clear();
++        }
++    }
++
++    public void commit() {
++        long lastCompletedOffset = lastCompletedOffset();
++        if (lastCompletedOffset != lastCommittedOffset()) {
++            LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
++            Map<Object, Object> data = ImmutableMap.builder()
++                    .put("topology", ImmutableMap.of("id", _topologyInstanceId,
++                            "name", _stormConf.get(Config.TOPOLOGY_NAME)))
++                    .put("offset", lastCompletedOffset)
++                    .put("partition", _partition.partition)
++                    .put("broker", ImmutableMap.of("host", _partition.host.host,
++                            "port", _partition.host.port))
++                    .put("topic", _spoutConfig.topic).build();
++            _state.writeJSON(committedPath(), data);
++            _committedTo = lastCompletedOffset;
++            LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
++        } else {
++            LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
++        }
++    }
++
++    private String committedPath() {
++        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
++    }
++
++    public long queryPartitionOffsetLatestTime() {
++        return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
++                OffsetRequest.LatestTime());
++    }
++
++    public long lastCommittedOffset() {
++        return _committedTo;
++    }
++
++    public long lastCompletedOffset() {
++        if (_pending.isEmpty()) {
++            return _emittedToOffset;
++        } else {
++            return _pending.first();
++        }
++    }
++
++    public Partition getPartition() {
++        return _partition;
++    }
++
++    public void close() {
++        _connections.unregister(_partition.host, _partition.partition);
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 0000000,0000000..05551ec
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@@ -1,0 -1,0 +1,19 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++import java.util.List;
++
++
++public class SpoutConfig extends KafkaConfig implements Serializable {
++    public List<String> zkServers = null;
++    public Integer zkPort = null;
++    public String zkRoot = null;
++    public String id = null;
++    public long stateUpdateIntervalMs = 2000;
++
++    public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
++        super(hosts, topic);
++        this.zkRoot = zkRoot;
++        this.id = id;
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
index 0000000,0000000..040060c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
@@@ -1,0 -1,0 +1,31 @@@
++package storm.kafka;
++
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++
++public class StaticCoordinator implements PartitionCoordinator {
++    Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
++    List<PartitionManager> _allManagers = new ArrayList();
++
++    public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
++        StaticHosts hosts = (StaticHosts) config.hosts;
++        List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
++        for (Partition myPartition : myPartitions) {
++            _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
++        }
++        _allManagers = new ArrayList(_managers.values());
++    }
++
++    @Override
++    public List<PartitionManager> getMyManagedPartitions() {
++        return _allManagers;
++    }
++
++    public PartitionManager getManager(Partition partition) {
++        return _managers.get(partition);
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
index 0000000,0000000..9ed7193
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
@@@ -1,0 -1,0 +1,21 @@@
++package storm.kafka;
++
++import storm.kafka.trident.GlobalPartitionInformation;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:43
++ */
++public class StaticHosts implements BrokerHosts {
++
++
++    private GlobalPartitionInformation partitionInformation;
++
++    public StaticHosts(GlobalPartitionInformation partitionInformation) {
++        this.partitionInformation = partitionInformation;
++    }
++
++    public GlobalPartitionInformation getPartitionInformation() {
++        return partitionInformation;
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
index 0000000,0000000..a9b9db1
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
@@@ -1,0 -1,0 +1,35 @@@
++package storm.kafka;
++
++import kafka.javaapi.consumer.SimpleConsumer;
++
++import java.util.HashMap;
++import java.util.Map;
++
++public class StaticPartitionConnections {
++    Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
++    KafkaConfig _config;
++    StaticHosts hosts;
++
++    public StaticPartitionConnections(KafkaConfig conf) {
++        _config = conf;
++        if (!(conf.hosts instanceof StaticHosts)) {
++            throw new RuntimeException("Must configure with static hosts");
++        }
++        this.hosts = (StaticHosts) conf.hosts;
++    }
++
++    public SimpleConsumer getConsumer(int partition) {
++        if (!_kafka.containsKey(partition)) {
++            Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
++            _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
++
++        }
++        return _kafka.get(partition);
++    }
++
++    public void close() {
++        for (SimpleConsumer consumer : _kafka.values()) {
++            consumer.close();
++        }
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
index 0000000,0000000..a6adddb
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
@@@ -1,0 -1,0 +1,20 @@@
++package storm.kafka;
++
++import backtype.storm.tuple.Values;
++import com.google.common.collect.ImmutableMap;
++
++import java.util.List;
++
++public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
++
++    @Override
++    public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
++        if ( key == null ) {
++            return deserialize(value);
++        }
++        String keyString = StringScheme.deserializeString(key);
++        String valueString = StringScheme.deserializeString(value);
++        return new Values(ImmutableMap.of(keyString, valueString));
++    }
++
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 0000000,0000000..a809448
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@@ -1,0 -1,0 +1,29 @@@
++package storm.kafka;
++
++import backtype.storm.spout.Scheme;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++
++import java.io.UnsupportedEncodingException;
++import java.util.List;
++
++public class StringScheme implements Scheme {
++
++    public static final String STRING_SCHEME_KEY = "str";
++
++    public List<Object> deserialize(byte[] bytes) {
++        return new Values(deserializeString(bytes));
++    }
++
++    public static String deserializeString(byte[] string) {
++        try {
++            return new String(string, "UTF-8");
++        } catch (UnsupportedEncodingException e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public Fields getOutputFields() {
++        return new Fields(STRING_SCHEME_KEY);
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
index 0000000,0000000..ec35aed
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
@@@ -1,0 -1,0 +1,95 @@@
++package storm.kafka;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++
++import java.util.*;
++
++import static storm.kafka.KafkaUtils.taskId;
++
++public class ZkCoordinator implements PartitionCoordinator {
++    public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
++
++    SpoutConfig _spoutConfig;
++    int _taskIndex;
++    int _totalTasks;
++    String _topologyInstanceId;
++    Map<Partition, PartitionManager> _managers = new HashMap();
++    List<PartitionManager> _cachedList;
++    Long _lastRefreshTime = null;
++    int _refreshFreqMs;
++    DynamicPartitionConnections _connections;
++    DynamicBrokersReader _reader;
++    ZkState _state;
++    Map _stormConf;
++
++    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
++        this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
++    }
++
++    public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
++        _spoutConfig = spoutConfig;
++        _connections = connections;
++        _taskIndex = taskIndex;
++        _totalTasks = totalTasks;
++        _topologyInstanceId = topologyInstanceId;
++        _stormConf = stormConf;
++        _state = state;
++        ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
++        _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
++        _reader = reader;
++    }
++
++    private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
++        ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
++        return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
++    }
++
++    @Override
++    public List<PartitionManager> getMyManagedPartitions() {
++        if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
++            refresh();
++            _lastRefreshTime = System.currentTimeMillis();
++        }
++        return _cachedList;
++    }
++
++    void refresh() {
++        try {
++            LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
++            GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
++            List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
++
++            Set<Partition> curr = _managers.keySet();
++            Set<Partition> newPartitions = new HashSet<Partition>(mine);
++            newPartitions.removeAll(curr);
++
++            Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
++            deletedPartitions.removeAll(mine);
++
++            LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
++
++            for (Partition id : deletedPartitions) {
++                PartitionManager man = _managers.remove(id);
++                man.close();
++            }
++            LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
++
++            for (Partition id : newPartitions) {
++                PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
++                _managers.put(id, man);
++            }
++
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++        _cachedList = new ArrayList<PartitionManager>(_managers.values());
++        LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
++    }
++
++    @Override
++    public PartitionManager getManager(Partition partition) {
++        return _managers.get(partition);
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
index 0000000,0000000..f2e0fc2
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
@@@ -1,0 -1,0 +1,22 @@@
++package storm.kafka;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:38
++ */
++public class ZkHosts implements BrokerHosts {
++    private static final String DEFAULT_ZK_PATH = "/brokers";
++
++    public String brokerZkStr = null;
++    public String brokerZkPath = null; // e.g., /kafka/brokers
++    public int refreshFreqSecs = 60;
++
++    public ZkHosts(String brokerZkStr, String brokerZkPath) {
++        this.brokerZkStr = brokerZkStr;
++        this.brokerZkPath = brokerZkPath;
++    }
++
++    public ZkHosts(String brokerZkStr) {
++        this(brokerZkStr, DEFAULT_ZK_PATH);
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkState.java
index 0000000,0000000..d5416af
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
@@@ -1,0 -1,0 +1,99 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.utils.Utils;
++import com.netflix.curator.framework.CuratorFramework;
++import com.netflix.curator.framework.CuratorFrameworkFactory;
++import com.netflix.curator.retry.RetryNTimes;
++import org.apache.zookeeper.CreateMode;
++import org.json.simple.JSONValue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.nio.charset.Charset;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++public class ZkState {
++    public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
++    CuratorFramework _curator;
++
++    private CuratorFramework newCurator(Map stateConf) throws Exception {
++        Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
++        String serverPorts = "";
++        for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
++            serverPorts = serverPorts + server + ":" + port + ",";
++        }
++        return CuratorFrameworkFactory.newClient(serverPorts,
++                Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
++                15000,
++                new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
++                        Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
++    }
++
++    public CuratorFramework getCurator() {
++        assert _curator != null;
++        return _curator;
++    }
++
++    public ZkState(Map stateConf) {
++        stateConf = new HashMap(stateConf);
++
++        try {
++            _curator = newCurator(stateConf);
++            _curator.start();
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public void writeJSON(String path, Map<Object, Object> data) {
++        LOG.info("Writing " + path + " the data " + data.toString());
++        writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
++    }
++
++    public void writeBytes(String path, byte[] bytes) {
++        try {
++            if (_curator.checkExists().forPath(path) == null) {
++                _curator.create()
++                        .creatingParentsIfNeeded()
++                        .withMode(CreateMode.PERSISTENT)
++                        .forPath(path, bytes);
++            } else {
++                _curator.setData().forPath(path, bytes);
++            }
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public Map<Object, Object> readJSON(String path) {
++        try {
++            byte[] b = readBytes(path);
++            if (b == null) {
++                return null;
++            }
++            return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public byte[] readBytes(String path) {
++        try {
++            if (_curator.checkExists().forPath(path) != null) {
++                return _curator.getData().forPath(path);
++            } else {
++                return null;
++            }
++        } catch (Exception e) {
++            throw new RuntimeException(e);
++        }
++    }
++
++    public void close() {
++        _curator.close();
++        _curator = null;
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0000000,0000000..89969d9
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@@ -1,0 -1,0 +1,72 @@@
++package storm.kafka.bolt;
++
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichBolt;
++import backtype.storm.tuple.Tuple;
++import kafka.javaapi.producer.Producer;
++import kafka.producer.KeyedMessage;
++import kafka.producer.ProducerConfig;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Map;
++import java.util.Properties;
++
++
++/**
++ * Bolt implementation that can send Tuple data to Kafka
++ * <p/>
++ * It expects the producer configuration and topic in storm config under
++ * <p/>
++ * 'kafka.broker.properties' and 'topic'
++ * <p/>
++ * respectively.
++ */
++public class KafkaBolt<K, V> extends BaseRichBolt {
++
++    private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
++
++    public static final String TOPIC = "topic";
++    public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
++
++    public static final String BOLT_KEY = "key";
++    public static final String BOLT_MESSAGE = "message";
++
++    private Producer<K, V> producer;
++    private OutputCollector collector;
++    private String topic;
++
++    @Override
++    public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
++        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
++        Properties properties = new Properties();
++        properties.putAll(configMap);
++        ProducerConfig config = new ProducerConfig(properties);
++        producer = new Producer<K, V>(config);
++        this.topic = (String) stormConf.get(TOPIC);
++        this.collector = collector;
++    }
++
++    @Override
++    public void execute(Tuple input) {
++        K key = null;
++        if (input.contains(BOLT_KEY)) {
++            key = (K) input.getValueByField(BOLT_KEY);
++        }
++        V message = (V) input.getValueByField(BOLT_MESSAGE);
++        try {
++            producer.send(new KeyedMessage<K, V>(topic, key, message));
++        } catch (Exception ex) {
++            LOG.error("Could not send message with key '" + key + "' and value '" + message + "'", ex);
++        } finally {
++            collector.ack(input);
++        }
++    }
++
++    @Override
++    public void declareOutputFields(OutputFieldsDeclarer declarer) {
++
++    }
++}

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
index 0000000,0000000..f67acaa
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
@@@ -1,0 -1,0 +1,37 @@@
++package storm.kafka.trident;
++
++import storm.kafka.KafkaUtils;
++import storm.trident.spout.IOpaquePartitionedTridentSpout;
++import storm.trident.spout.IPartitionedTridentSpout;
++
++import java.util.Map;
++
++/**
++ * Date: 11/05/2013
++ * Time: 19:35
++ */
++class Coordinator implements IPartitionedTridentSpout.Coordinator<GlobalPartitionInformation>, IOpaquePartitionedTridentSpout.Coordinator<GlobalPartitionInformation> {
++
++    private IBrokerReader reader;
++    private TridentKafkaConfig config;
++
++    public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
++        config = tridentKafkaConfig;
++        reader = KafkaUtils.makeBrokerReader(conf, config);
++    }
++
++    @Override
++    public void close() {
++        config.coordinator.close();
++    }
++
++    @Override
++    public boolean isReady(long txid) {
++        return config.coordinator.isReady(txid);
++    }
++
++    @Override
++    public GlobalPartitionInformation getPartitionsForBatch() {
++        return reader.getCurrentBrokers();
++    }
++}