You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:42 UTC
[43/50] [abbrv] git commit: Add storm-kafka as an external module.
Add storm-kafka as an external module.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/7d1bf2a9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/7d1bf2a9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/7d1bf2a9
Branch: refs/heads/master
Commit: 7d1bf2a923f8416197f81b467d40f959b2f4f997
Parents: 38ea0ca 8fafbad
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 9 10:26:08 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 9 10:26:08 2014 -0400
----------------------------------------------------------------------
external/storm-kafka/CHANGELOG.md | 10 +
external/storm-kafka/LICENSE | 191 ++++++++++++++
external/storm-kafka/README.md | 22 ++
external/storm-kafka/pom.xml | 138 ++++++++++
.../storm-kafka/src/jvm/storm/kafka/Broker.java | 63 +++++
.../src/jvm/storm/kafka/BrokerHosts.java | 11 +
.../jvm/storm/kafka/DynamicBrokersReader.java | 124 +++++++++
.../kafka/DynamicPartitionConnections.java | 77 ++++++
.../jvm/storm/kafka/FailedFetchException.java | 12 +
.../src/jvm/storm/kafka/KafkaConfig.java | 33 +++
.../src/jvm/storm/kafka/KafkaError.java | 30 +++
.../src/jvm/storm/kafka/KafkaSpout.java | 173 +++++++++++++
.../src/jvm/storm/kafka/KafkaUtils.java | 218 ++++++++++++++++
.../src/jvm/storm/kafka/KeyValueScheme.java | 11 +
.../kafka/KeyValueSchemeAsMultiScheme.java | 19 ++
.../src/jvm/storm/kafka/Partition.java | 47 ++++
.../jvm/storm/kafka/PartitionCoordinator.java | 9 +
.../src/jvm/storm/kafka/PartitionManager.java | 224 ++++++++++++++++
.../src/jvm/storm/kafka/SpoutConfig.java | 19 ++
.../src/jvm/storm/kafka/StaticCoordinator.java | 31 +++
.../src/jvm/storm/kafka/StaticHosts.java | 21 ++
.../storm/kafka/StaticPartitionConnections.java | 35 +++
.../jvm/storm/kafka/StringKeyValueScheme.java | 20 ++
.../src/jvm/storm/kafka/StringScheme.java | 29 +++
.../src/jvm/storm/kafka/ZkCoordinator.java | 95 +++++++
.../src/jvm/storm/kafka/ZkHosts.java | 22 ++
.../src/jvm/storm/kafka/ZkState.java | 99 +++++++
.../src/jvm/storm/kafka/bolt/KafkaBolt.java | 72 ++++++
.../jvm/storm/kafka/trident/Coordinator.java | 37 +++
.../storm/kafka/trident/DefaultCoordinator.java | 14 +
.../trident/GlobalPartitionInformation.java | 85 ++++++
.../storm/kafka/trident/IBatchCoordinator.java | 9 +
.../jvm/storm/kafka/trident/IBrokerReader.java | 8 +
.../src/jvm/storm/kafka/trident/MaxMetric.java | 23 ++
.../kafka/trident/OpaqueTridentKafkaSpout.java | 42 +++
.../storm/kafka/trident/StaticBrokerReader.java | 19 ++
.../trident/TransactionalTridentKafkaSpout.java | 41 +++
.../storm/kafka/trident/TridentKafkaConfig.java | 20 ++
.../kafka/trident/TridentKafkaEmitter.java | 256 +++++++++++++++++++
.../jvm/storm/kafka/trident/ZkBrokerReader.java | 45 ++++
.../storm/kafka/DynamicBrokersReaderTest.java | 155 +++++++++++
.../src/test/storm/kafka/KafkaErrorTest.java | 39 +++
.../src/test/storm/kafka/KafkaTestBroker.java | 53 ++++
.../src/test/storm/kafka/KafkaUtilsTest.java | 221 ++++++++++++++++
.../storm/kafka/StringKeyValueSchemeTest.java | 38 +++
.../src/test/storm/kafka/TestUtils.java | 20 ++
.../src/test/storm/kafka/ZkCoordinatorTest.java | 130 ++++++++++
.../test/storm/kafka/bolt/KafkaBoltTest.java | 171 +++++++++++++
pom.xml | 1 +
49 files changed, 3282 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/CHANGELOG.md
----------------------------------------------------------------------
diff --cc external/storm-kafka/CHANGELOG.md
index 0000000,0000000..f876421
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/CHANGELOG.md
@@@ -1,0 -1,0 +1,10 @@@
++## 0.5.0
++* fixed partition assignment for KafkaSpout
++* upgraded to storm 0.9.1
++## 0.4.0
++* added support for reading kafka message keys
++* configurable metrics emit interval
++## 0.3.0
++* updated partition path in zookeeper
++* added error handling for fetch request
++
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/LICENSE
----------------------------------------------------------------------
diff --cc external/storm-kafka/LICENSE
index 0000000,0000000..37ec93a
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/LICENSE
@@@ -1,0 -1,0 +1,191 @@@
++Apache License
++Version 2.0, January 2004
++http://www.apache.org/licenses/
++
++TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
++
++1. Definitions.
++
++"License" shall mean the terms and conditions for use, reproduction, and
++distribution as defined by Sections 1 through 9 of this document.
++
++"Licensor" shall mean the copyright owner or entity authorized by the copyright
++owner that is granting the License.
++
++"Legal Entity" shall mean the union of the acting entity and all other entities
++that control, are controlled by, or are under common control with that entity.
++For the purposes of this definition, "control" means (i) the power, direct or
++indirect, to cause the direction or management of such entity, whether by
++contract or otherwise, or (ii) ownership of fifty percent (50%) or more of the
++outstanding shares, or (iii) beneficial ownership of such entity.
++
++"You" (or "Your") shall mean an individual or Legal Entity exercising
++permissions granted by this License.
++
++"Source" form shall mean the preferred form for making modifications, including
++but not limited to software source code, documentation source, and configuration
++files.
++
++"Object" form shall mean any form resulting from mechanical transformation or
++translation of a Source form, including but not limited to compiled object code,
++generated documentation, and conversions to other media types.
++
++"Work" shall mean the work of authorship, whether in Source or Object form, made
++available under the License, as indicated by a copyright notice that is included
++in or attached to the work (an example is provided in the Appendix below).
++
++"Derivative Works" shall mean any work, whether in Source or Object form, that
++is based on (or derived from) the Work and for which the editorial revisions,
++annotations, elaborations, or other modifications represent, as a whole, an
++original work of authorship. For the purposes of this License, Derivative Works
++shall not include works that remain separable from, or merely link (or bind by
++name) to the interfaces of, the Work and Derivative Works thereof.
++
++"Contribution" shall mean any work of authorship, including the original version
++of the Work and any modifications or additions to that Work or Derivative Works
++thereof, that is intentionally submitted to Licensor for inclusion in the Work
++by the copyright owner or by an individual or Legal Entity authorized to submit
++on behalf of the copyright owner. For the purposes of this definition,
++"submitted" means any form of electronic, verbal, or written communication sent
++to the Licensor or its representatives, including but not limited to
++communication on electronic mailing lists, source code control systems, and
++issue tracking systems that are managed by, or on behalf of, the Licensor for
++the purpose of discussing and improving the Work, but excluding communication
++that is conspicuously marked or otherwise designated in writing by the copyright
++owner as "Not a Contribution."
++
++"Contributor" shall mean Licensor and any individual or Legal Entity on behalf
++of whom a Contribution has been received by Licensor and subsequently
++incorporated within the Work.
++
++2. Grant of Copyright License.
++
++Subject to the terms and conditions of this License, each Contributor hereby
++grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
++irrevocable copyright license to reproduce, prepare Derivative Works of,
++publicly display, publicly perform, sublicense, and distribute the Work and such
++Derivative Works in Source or Object form.
++
++3. Grant of Patent License.
++
++Subject to the terms and conditions of this License, each Contributor hereby
++grants to You a perpetual, worldwide, non-exclusive, no-charge, royalty-free,
++irrevocable (except as stated in this section) patent license to make, have
++made, use, offer to sell, sell, import, and otherwise transfer the Work, where
++such license applies only to those patent claims licensable by such Contributor
++that are necessarily infringed by their Contribution(s) alone or by combination
++of their Contribution(s) with the Work to which such Contribution(s) was
++submitted. If You institute patent litigation against any entity (including a
++cross-claim or counterclaim in a lawsuit) alleging that the Work or a
++Contribution incorporated within the Work constitutes direct or contributory
++patent infringement, then any patent licenses granted to You under this License
++for that Work shall terminate as of the date such litigation is filed.
++
++4. Redistribution.
++
++You may reproduce and distribute copies of the Work or Derivative Works thereof
++in any medium, with or without modifications, and in Source or Object form,
++provided that You meet the following conditions:
++
++You must give any other recipients of the Work or Derivative Works a copy of
++this License; and
++You must cause any modified files to carry prominent notices stating that You
++changed the files; and
++You must retain, in the Source form of any Derivative Works that You distribute,
++all copyright, patent, trademark, and attribution notices from the Source form
++of the Work, excluding those notices that do not pertain to any part of the
++Derivative Works; and
++If the Work includes a "NOTICE" text file as part of its distribution, then any
++Derivative Works that You distribute must include a readable copy of the
++attribution notices contained within such NOTICE file, excluding those notices
++that do not pertain to any part of the Derivative Works, in at least one of the
++following places: within a NOTICE text file distributed as part of the
++Derivative Works; within the Source form or documentation, if provided along
++with the Derivative Works; or, within a display generated by the Derivative
++Works, if and wherever such third-party notices normally appear. The contents of
++the NOTICE file are for informational purposes only and do not modify the
++License. You may add Your own attribution notices within Derivative Works that
++You distribute, alongside or as an addendum to the NOTICE text from the Work,
++provided that such additional attribution notices cannot be construed as
++modifying the License.
++You may add Your own copyright statement to Your modifications and may provide
++additional or different license terms and conditions for use, reproduction, or
++distribution of Your modifications, or for any such Derivative Works as a whole,
++provided Your use, reproduction, and distribution of the Work otherwise complies
++with the conditions stated in this License.
++
++5. Submission of Contributions.
++
++Unless You explicitly state otherwise, any Contribution intentionally submitted
++for inclusion in the Work by You to the Licensor shall be under the terms and
++conditions of this License, without any additional terms or conditions.
++Notwithstanding the above, nothing herein shall supersede or modify the terms of
++any separate license agreement you may have executed with Licensor regarding
++such Contributions.
++
++6. Trademarks.
++
++This License does not grant permission to use the trade names, trademarks,
++service marks, or product names of the Licensor, except as required for
++reasonable and customary use in describing the origin of the Work and
++reproducing the content of the NOTICE file.
++
++7. Disclaimer of Warranty.
++
++Unless required by applicable law or agreed to in writing, Licensor provides the
++Work (and each Contributor provides its Contributions) on an "AS IS" BASIS,
++WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied,
++including, without limitation, any warranties or conditions of TITLE,
++NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A PARTICULAR PURPOSE. You are
++solely responsible for determining the appropriateness of using or
++redistributing the Work and assume any risks associated with Your exercise of
++permissions under this License.
++
++8. Limitation of Liability.
++
++In no event and under no legal theory, whether in tort (including negligence),
++contract, or otherwise, unless required by applicable law (such as deliberate
++and grossly negligent acts) or agreed to in writing, shall any Contributor be
++liable to You for damages, including any direct, indirect, special, incidental,
++or consequential damages of any character arising as a result of this License or
++out of the use or inability to use the Work (including but not limited to
++damages for loss of goodwill, work stoppage, computer failure or malfunction, or
++any and all other commercial damages or losses), even if such Contributor has
++been advised of the possibility of such damages.
++
++9. Accepting Warranty or Additional Liability.
++
++While redistributing the Work or Derivative Works thereof, You may choose to
++offer, and charge a fee for, acceptance of support, warranty, indemnity, or
++other liability obligations and/or rights consistent with this License. However,
++in accepting such obligations, You may act only on Your own behalf and on Your
++sole responsibility, not on behalf of any other Contributor, and only if You
++agree to indemnify, defend, and hold each Contributor harmless for any liability
++incurred by, or claims asserted against, such Contributor by reason of your
++accepting any such warranty or additional liability.
++
++END OF TERMS AND CONDITIONS
++
++APPENDIX: How to apply the Apache License to your work
++
++To apply the Apache License to your work, attach the following boilerplate
++notice, with the fields enclosed by brackets "[]" replaced with your own
++identifying information. (Don't include the brackets!) The text should be
++enclosed in the appropriate comment syntax for the file format. We also
++recommend that a file or class name and description of purpose be included on
++the same "printed page" as the copyright notice for easier identification within
++third-party archives.
++
++ Copyright [yyyy] [name of copyright owner]
++
++ Licensed under the Apache License, Version 2.0 (the "License");
++ you may not use this file except in compliance with the License.
++ You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --cc external/storm-kafka/README.md
index 0000000,0000000..874db01
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/README.md
@@@ -1,0 -1,0 +1,22 @@@
++storm-kafka-0.8-plus
++====================
++
++Port of storm-kafka to support kafka >= 0.8
++
++##Usage:
++For information on how to use this library in your project see:
++
++[https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus](https://clojars.org/net.wurstmeister.storm/storm-kafka-0.8-plus)
++
++
++##Example Topologies:
++
++[https://github.com/wurstmeister/storm-kafka-0.8-plus-test](https://github.com/wurstmeister/storm-kafka-0.8-plus-test)
++
++##Acknowledgement:
++
++YourKit is kindly supporting this open source project with its full-featured Java Profiler.
++YourKit, LLC is the creator of innovative and intelligent tools for profiling
++Java and .NET applications. Take a look at YourKit's leading software products:
++<a href="http://www.yourkit.com/java/profiler/index.jsp">YourKit Java Profiler</a> and
++<a href="http://www.yourkit.com/.net/profiler/index.jsp">YourKit .NET Profiler</a>.
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/pom.xml
----------------------------------------------------------------------
diff --cc external/storm-kafka/pom.xml
index 0000000,0000000..15743b6
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/pom.xml
@@@ -1,0 -1,0 +1,138 @@@
++<?xml version="1.0" encoding="UTF-8"?>
++<!--
++ Licensed to the Apache Software Foundation (ASF) under one or more
++ contributor license agreements. See the NOTICE file distributed with
++ this work for additional information regarding copyright ownership.
++ The ASF licenses this file to You under the Apache License, Version 2.0
++ (the "License"); you may not use this file except in compliance with
++ the License. You may obtain a copy of the License at
++
++ http://www.apache.org/licenses/LICENSE-2.0
++
++ Unless required by applicable law or agreed to in writing, software
++ distributed under the License is distributed on an "AS IS" BASIS,
++ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
++ See the License for the specific language governing permissions and
++ limitations under the License.
++-->
++<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
++ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
++ <modelVersion>4.0.0</modelVersion>
++
++ <parent>
++ <artifactId>storm</artifactId>
++ <groupId>org.apache.storm</groupId>
++ <version>0.9.2-incubating-SNAPSHOT</version>
++ <relativePath>../../pom.xml</relativePath>
++ </parent>
++
++ <packaging>jar</packaging>
++ <artifactId>storm-kafka</artifactId>
++ <name>storm-kafka</name>
++ <description>Storm Spouts for Apache Kafka</description>
++ <properties>
++ <scalaVersion>2.9.2</scalaVersion>
++ <kafkaArtifact>kafka_2.9.2</kafkaArtifact>
++ <envClassifier></envClassifier>
++ </properties>
++ <build>
++ <plugins>
++
++ </plugins>
++ <sourceDirectory>src/jvm</sourceDirectory>
++ <testSourceDirectory>src/test</testSourceDirectory>
++ </build>
++ <dependencies>
++ <dependency>
++ <groupId>org.mockito</groupId>
++ <artifactId>mockito-all</artifactId>
++ <version>1.9.0</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.scala-lang</groupId>
++ <artifactId>scala-library</artifactId>
++ <version>${scalaVersion}</version>
++ </dependency>
++ <dependency>
++ <groupId>junit</groupId>
++ <artifactId>junit</artifactId>
++ <version>4.11</version>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>com.netflix.curator</groupId>
++ <artifactId>curator-framework</artifactId>
++ <version>1.3.3</version>
++ <exclusions>
++ <exclusion>
++ <groupId>log4j</groupId>
++ <artifactId>log4j</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.slf4j</groupId>
++ <artifactId>slf4j-log4j12</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>com.netflix.curator</groupId>
++ <artifactId>curator-recipes</artifactId>
++ <version>1.3.3</version>
++ <exclusions>
++ <exclusion>
++ <groupId>log4j</groupId>
++ <artifactId>log4j</artifactId>
++ </exclusion>
++ </exclusions>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>com.netflix.curator</groupId>
++ <artifactId>curator-test</artifactId>
++ <version>1.3.3</version>
++ <exclusions>
++ <exclusion>
++ <groupId>log4j</groupId>
++ <artifactId>log4j</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>org.testng</groupId>
++ <artifactId>testng</artifactId>
++ </exclusion>
++ </exclusions>
++ <scope>test</scope>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.kafka</groupId>
++ <artifactId>${kafkaArtifact}</artifactId>
++ <version>0.8.0</version>
++ <exclusions>
++ <exclusion>
++ <groupId>org.apache.zookeeper</groupId>
++ <artifactId>zookeeper</artifactId>
++ </exclusion>
++ <exclusion>
++ <groupId>log4j</groupId>
++ <artifactId>log4j</artifactId>
++ </exclusion>
++ </exclusions>
++ </dependency>
++ <dependency>
++ <groupId>org.apache.storm</groupId>
++ <artifactId>storm-core</artifactId>
++ <version>${project.version}</version>
++ <scope>provided</scope>
++ </dependency>
++ </dependencies>
++ <profiles>
++ <profile>
++ <id>Scala-2.10</id>
++ <properties>
++ <scalaVersion>2.10.3</scalaVersion>
++ <kafkaArtifact>kafka_2.10</kafkaArtifact>
++ <envClassifier>scala_2.10</envClassifier>
++ </properties>
++ </profile>
++ </profiles>
++</project>
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/Broker.java
index 0000000,0000000..2451eee
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/Broker.java
@@@ -1,0 -1,0 +1,63 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++import com.google.common.base.Objects;
++
++public class Broker implements Serializable, Comparable<Broker> {
++ public final String host;
++ public final int port;
++
++ public Broker(String host, int port) {
++ this.host = host;
++ this.port = port;
++ }
++
++ public Broker(String host) {
++ this(host, 9092);
++ }
++
++ @Override
++ public int hashCode() {
++ return Objects.hashCode(host, port);
++ }
++
++ @Override
++ public boolean equals(Object obj) {
++ if (this == obj) {
++ return true;
++ }
++ if (obj == null || getClass() != obj.getClass()) {
++ return false;
++ }
++ final Broker other = (Broker) obj;
++ return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
++ }
++
++ @Override
++ public String toString() {
++ return host + ":" + port;
++ }
++
++ public static Broker fromString(String host) {
++ Broker hp;
++ String[] spec = host.split(":");
++ if (spec.length == 1) {
++ hp = new Broker(spec[0]);
++ } else if (spec.length == 2) {
++ hp = new Broker(spec[0], Integer.parseInt(spec[1]));
++ } else {
++ throw new IllegalArgumentException("Invalid host specification: " + host);
++ }
++ return hp;
++ }
++
++
++ @Override
++ public int compareTo(Broker o) {
++ if (this.host.equals(o.host)) {
++ return this.port - o.port;
++ } else {
++ return this.host.compareTo(o.host);
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
index 0000000,0000000..12ef7b1
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/BrokerHosts.java
@@@ -1,0 -1,0 +1,11 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:40
++ */
++public interface BrokerHosts extends Serializable {
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index 0000000,0000000..cd751fe
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@@ -1,0 -1,0 +1,124 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.utils.Utils;
++import com.netflix.curator.framework.CuratorFramework;
++import com.netflix.curator.framework.CuratorFrameworkFactory;
++import com.netflix.curator.retry.RetryNTimes;
++import org.json.simple.JSONValue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++
++import java.io.IOException;
++import java.io.UnsupportedEncodingException;
++import java.util.List;
++import java.util.Map;
++
++public class DynamicBrokersReader {
++
++ public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
++
++ private CuratorFramework _curator;
++ private String _zkPath;
++ private String _topic;
++
++ public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
++ _zkPath = zkPath;
++ _topic = topic;
++ _curator = CuratorFrameworkFactory.newClient(
++ zkStr,
++ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
++ 15000,
++ new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
++ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
++ _curator.start();
++ }
++
++ /**
++ * Get all partitions with their current leaders
++ */
++ public GlobalPartitionInformation getBrokerInfo() {
++ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
++ try {
++ int numPartitionsForTopic = getNumPartitions();
++ String brokerInfoPath = brokerPath();
++ for (int partition = 0; partition < numPartitionsForTopic; partition++) {
++ int leader = getLeaderFor(partition);
++ String path = brokerInfoPath + "/" + leader;
++ try {
++ byte[] brokerData = _curator.getData().forPath(path);
++ Broker hp = getBrokerHost(brokerData);
++ globalPartitionInformation.addPartition(partition, hp);
++ } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
++ LOG.error("Node {} does not exist ", path);
++ }
++ }
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
++ return globalPartitionInformation;
++ }
++
++
++ private int getNumPartitions() {
++ try {
++ String topicBrokersPath = partitionPath();
++ List<String> children = _curator.getChildren().forPath(topicBrokersPath);
++ return children.size();
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public String partitionPath() {
++ return _zkPath + "/topics/" + _topic + "/partitions";
++ }
++
++ public String brokerPath() {
++ return _zkPath + "/ids";
++ }
++
++ /**
++ * get /brokers/topics/distributedTopic/partitions/1/state
++ * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
++ *
++ * @param partition
++ * @return
++ */
++ private int getLeaderFor(long partition) {
++ try {
++ String topicBrokersPath = partitionPath();
++ byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
++ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
++ Integer leader = ((Number) value.get("leader")).intValue();
++ return leader;
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public void close() {
++ _curator.close();
++ }
++
++ /**
++ * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
++ * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
++ *
++ * @param contents
++ * @return
++ */
++ private Broker getBrokerHost(byte[] contents) {
++ try {
++ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
++ String host = (String) value.get("host");
++ Integer port = ((Long) value.get("port")).intValue();
++ return new Broker(host, port);
++ } catch (UnsupportedEncodingException e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 0000000,0000000..8d0115b
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@@ -1,0 -1,0 +1,77 @@@
++package storm.kafka;
++
++import kafka.javaapi.consumer.SimpleConsumer;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.IBrokerReader;
++
++import java.util.HashMap;
++import java.util.HashSet;
++import java.util.Map;
++import java.util.Set;
++
++
++public class DynamicPartitionConnections {
++
++ public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
++
++ static class ConnectionInfo {
++ SimpleConsumer consumer;
++ Set<Integer> partitions = new HashSet();
++
++ public ConnectionInfo(SimpleConsumer consumer) {
++ this.consumer = consumer;
++ }
++ }
++
++ Map<Broker, ConnectionInfo> _connections = new HashMap();
++ KafkaConfig _config;
++ IBrokerReader _reader;
++
++ public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
++ _config = config;
++ _reader = brokerReader;
++ }
++
++ public SimpleConsumer register(Partition partition) {
++ Broker broker = _reader.getCurrentBrokers().getBrokerFor(partition.partition);
++ return register(broker, partition.partition);
++ }
++
++ public SimpleConsumer register(Broker host, int partition) {
++ if (!_connections.containsKey(host)) {
++ _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
++ }
++ ConnectionInfo info = _connections.get(host);
++ info.partitions.add(partition);
++ return info.consumer;
++ }
++
++ public SimpleConsumer getConnection(Partition partition) {
++ ConnectionInfo info = _connections.get(partition.host);
++ if (info != null) {
++ return info.consumer;
++ }
++ return null;
++ }
++
++ public void unregister(Broker port, int partition) {
++ ConnectionInfo info = _connections.get(port);
++ info.partitions.remove(partition);
++ if (info.partitions.isEmpty()) {
++ info.consumer.close();
++ _connections.remove(port);
++ }
++ }
++
++ public void unregister(Partition partition) {
++ unregister(partition.host, partition.partition);
++ }
++
++ public void clear() {
++ for (ConnectionInfo info : _connections.values()) {
++ info.consumer.close();
++ }
++ _connections.clear();
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
index 0000000,0000000..0bd1123
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/FailedFetchException.java
@@@ -1,0 -1,0 +1,12 @@@
++package storm.kafka;
++
++public class FailedFetchException extends RuntimeException {
++
++ public FailedFetchException(String message) {
++ super(message);
++ }
++
++ public FailedFetchException(Exception e) {
++ super(e);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
index 0000000,0000000..8ef2a88
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaConfig.java
@@@ -1,0 -1,0 +1,33 @@@
++package storm.kafka;
++
++import backtype.storm.spout.MultiScheme;
++import backtype.storm.spout.RawMultiScheme;
++
++import java.io.Serializable;
++
++public class KafkaConfig implements Serializable {
++
++ public final BrokerHosts hosts;
++ public final String topic;
++ public final String clientId;
++
++ public int fetchSizeBytes = 1024 * 1024;
++ public int socketTimeoutMs = 10000;
++ public int bufferSizeBytes = 1024 * 1024;
++ public MultiScheme scheme = new RawMultiScheme();
++ public boolean forceFromStart = false;
++ public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
++ public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
++ public int metricsTimeBucketSizeInSecs = 60;
++
++ public KafkaConfig(BrokerHosts hosts, String topic) {
++ this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
++ }
++
++ public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
++ this.hosts = hosts;
++ this.topic = topic;
++ this.clientId = clientId;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
index 0000000,0000000..a67335c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaError.java
@@@ -1,0 -1,0 +1,30 @@@
++package storm.kafka;
++
++/**
++ * Date: 11/01/2014
++ * Time: 14:21
++ */
++public enum KafkaError {
++ NO_ERROR,
++ OFFSET_OUT_OF_RANGE,
++ INVALID_MESSAGE,
++ UNKNOWN_TOPIC_OR_PARTITION,
++ INVALID_FETCH_SIZE,
++ LEADER_NOT_AVAILABLE,
++ NOT_LEADER_FOR_PARTITION,
++ REQUEST_TIMED_OUT,
++ BROKER_NOT_AVAILABLE,
++ REPLICA_NOT_AVAILABLE,
++ MESSAGE_SIZE_TOO_LARGE,
++ STALE_CONTROLLER_EPOCH,
++ OFFSET_METADATA_TOO_LARGE,
++ UNKNOWN;
++
++ public static KafkaError getError(int errorCode) {
++ if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
++ return UNKNOWN;
++ } else {
++ return values()[errorCode];
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
index 0000000,0000000..79e33fe
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaSpout.java
@@@ -1,0 -1,0 +1,173 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.metric.api.IMetric;
++import backtype.storm.spout.SpoutOutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichSpout;
++import kafka.message.Message;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.PartitionManager.KafkaMessageId;
++
++import java.util.*;
++
++// TODO: need to add blacklisting
++// TODO: need to make a best effort to not re-emit messages if don't have to
++public class KafkaSpout extends BaseRichSpout {
++ public static class MessageAndRealOffset {
++ public Message msg;
++ public long offset;
++
++ public MessageAndRealOffset(Message msg, long offset) {
++ this.msg = msg;
++ this.offset = offset;
++ }
++ }
++
++ static enum EmitState {
++ EMITTED_MORE_LEFT,
++ EMITTED_END,
++ NO_EMITTED
++ }
++
++ public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
++
++ String _uuid = UUID.randomUUID().toString();
++ SpoutConfig _spoutConfig;
++ SpoutOutputCollector _collector;
++ PartitionCoordinator _coordinator;
++ DynamicPartitionConnections _connections;
++ ZkState _state;
++
++ long _lastUpdateMs = 0;
++
++ int _currPartitionIndex = 0;
++
++ public KafkaSpout(SpoutConfig spoutConf) {
++ _spoutConfig = spoutConf;
++ }
++
++ @Override
++ public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
++ _collector = collector;
++
++ Map stateConf = new HashMap(conf);
++ List<String> zkServers = _spoutConfig.zkServers;
++ if (zkServers == null) {
++ zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
++ }
++ Integer zkPort = _spoutConfig.zkPort;
++ if (zkPort == null) {
++ zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
++ }
++ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
++ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
++ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
++ _state = new ZkState(stateConf);
++
++ _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
++
++ // using TransactionalState like this is a hack
++ int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
++ if (_spoutConfig.hosts instanceof StaticHosts) {
++ _coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
++ } else {
++ _coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
++ }
++
++ context.registerMetric("kafkaOffset", new IMetric() {
++ KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
++
++ @Override
++ public Object getValueAndReset() {
++ List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
++ Set<Partition> latestPartitions = new HashSet();
++ for (PartitionManager pm : pms) {
++ latestPartitions.add(pm.getPartition());
++ }
++ _kafkaOffsetMetric.refreshPartitions(latestPartitions);
++ for (PartitionManager pm : pms) {
++ _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
++ }
++ return _kafkaOffsetMetric.getValueAndReset();
++ }
++ }, _spoutConfig.metricsTimeBucketSizeInSecs);
++
++ context.registerMetric("kafkaPartition", new IMetric() {
++ @Override
++ public Object getValueAndReset() {
++ List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
++ Map concatMetricsDataMaps = new HashMap();
++ for (PartitionManager pm : pms) {
++ concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
++ }
++ return concatMetricsDataMaps;
++ }
++ }, _spoutConfig.metricsTimeBucketSizeInSecs);
++ }
++
++ @Override
++ public void close() {
++ _state.close();
++ }
++
++ @Override
++ public void nextTuple() {
++ List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
++ for (int i = 0; i < managers.size(); i++) {
++
++ // in case the number of managers decreased
++ _currPartitionIndex = _currPartitionIndex % managers.size();
++ EmitState state = managers.get(_currPartitionIndex).next(_collector);
++ if (state != EmitState.EMITTED_MORE_LEFT) {
++ _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
++ }
++ if (state != EmitState.NO_EMITTED) {
++ break;
++ }
++ }
++
++ long now = System.currentTimeMillis();
++ if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
++ commit();
++ }
++ }
++
++ @Override
++ public void ack(Object msgId) {
++ KafkaMessageId id = (KafkaMessageId) msgId;
++ PartitionManager m = _coordinator.getManager(id.partition);
++ if (m != null) {
++ m.ack(id.offset);
++ }
++ }
++
++ @Override
++ public void fail(Object msgId) {
++ KafkaMessageId id = (KafkaMessageId) msgId;
++ PartitionManager m = _coordinator.getManager(id.partition);
++ if (m != null) {
++ m.fail(id.offset);
++ }
++ }
++
++ @Override
++ public void deactivate() {
++ commit();
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++ declarer.declare(_spoutConfig.scheme.getOutputFields());
++ }
++
++ private void commit() {
++ _lastUpdateMs = System.currentTimeMillis();
++ for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
++ manager.commit();
++ }
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0000000,0000000..0e7f601
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@@ -1,0 -1,0 +1,218 @@@
++package storm.kafka;
++
++import backtype.storm.metric.api.IMetric;
++import backtype.storm.utils.Utils;
++import com.google.common.base.Preconditions;
++import kafka.api.FetchRequest;
++import kafka.api.FetchRequestBuilder;
++import kafka.api.PartitionOffsetRequestInfo;
++import kafka.common.TopicAndPartition;
++import kafka.javaapi.FetchResponse;
++import kafka.javaapi.OffsetRequest;
++import kafka.javaapi.consumer.SimpleConsumer;
++import kafka.javaapi.message.ByteBufferMessageSet;
++import kafka.message.Message;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++import storm.kafka.trident.IBrokerReader;
++import storm.kafka.trident.StaticBrokerReader;
++import storm.kafka.trident.ZkBrokerReader;
++
++import java.net.ConnectException;
++import java.nio.ByteBuffer;
++import java.util.*;
++
++
++public class KafkaUtils {
++
++ public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
++ private static final int NO_OFFSET = -5;
++
++
++ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
++ if (conf.hosts instanceof StaticHosts) {
++ return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
++ } else {
++ return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
++ }
++ }
++
++
++ public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
++ long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
++ if (config.forceFromStart) {
++ startOffsetTime = config.startOffsetTime;
++ }
++ return getOffset(consumer, topic, partition, startOffsetTime);
++ }
++
++ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
++ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
++ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
++ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
++ OffsetRequest request = new OffsetRequest(
++ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
++
++ long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
++ if (offsets.length > 0) {
++ return offsets[0];
++ } else {
++ return NO_OFFSET;
++ }
++ }
++
++ public static class KafkaOffsetMetric implements IMetric {
++ Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
++ Set<Partition> _partitions;
++ String _topic;
++ DynamicPartitionConnections _connections;
++
++ public KafkaOffsetMetric(String topic, DynamicPartitionConnections connections) {
++ _topic = topic;
++ _connections = connections;
++ }
++
++ public void setLatestEmittedOffset(Partition partition, long offset) {
++ _partitionToOffset.put(partition, offset);
++ }
++
++ @Override
++ public Object getValueAndReset() {
++ try {
++ long totalSpoutLag = 0;
++ long totalEarliestTimeOffset = 0;
++ long totalLatestTimeOffset = 0;
++ long totalLatestEmittedOffset = 0;
++ HashMap ret = new HashMap();
++ if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
++ for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
++ Partition partition = e.getKey();
++ SimpleConsumer consumer = _connections.getConnection(partition);
++ if (consumer == null) {
++ LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
++ return null;
++ }
++ long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
++ long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
++ if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
++ LOG.warn("No data found in Kafka Partition " + partition.getId());
++ return null;
++ }
++ long latestEmittedOffset = e.getValue();
++ long spoutLag = latestTimeOffset - latestEmittedOffset;
++ ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
++ ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
++ ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
++ ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
++ totalSpoutLag += spoutLag;
++ totalEarliestTimeOffset += earliestTimeOffset;
++ totalLatestTimeOffset += latestTimeOffset;
++ totalLatestEmittedOffset += latestEmittedOffset;
++ }
++ ret.put("totalSpoutLag", totalSpoutLag);
++ ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
++ ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
++ ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
++ return ret;
++ } else {
++ LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
++ }
++ } catch (Throwable t) {
++ LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
++ }
++ return null;
++ }
++
++ public void refreshPartitions(Set<Partition> partitions) {
++ _partitions = partitions;
++ Iterator<Partition> it = _partitionToOffset.keySet().iterator();
++ while (it.hasNext()) {
++ if (!partitions.contains(it.next())) {
++ it.remove();
++ }
++ }
++ }
++ }
++
++ public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset) {
++ ByteBufferMessageSet msgs = null;
++ String topic = config.topic;
++ int partitionId = partition.partition;
++ for (int errors = 0; errors < 2 && msgs == null; errors++) {
++ FetchRequestBuilder builder = new FetchRequestBuilder();
++ FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
++ clientId(config.clientId).build();
++ FetchResponse fetchResponse;
++ try {
++ fetchResponse = consumer.fetch(fetchRequest);
++ } catch (Exception e) {
++ if (e instanceof ConnectException) {
++ throw new FailedFetchException(e);
++ } else {
++ throw new RuntimeException(e);
++ }
++ }
++ if (fetchResponse.hasError()) {
++ KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
++ if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange && errors == 0) {
++ long startOffset = getOffset(consumer, topic, partitionId, config.startOffsetTime);
++ LOG.warn("Got fetch request with offset out of range: [" + offset + "]; " +
++ "retrying with default start offset time from configuration. " +
++ "configured start offset time: [" + config.startOffsetTime + "] offset: [" + startOffset + "]");
++ offset = startOffset;
++ } else {
++ String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
++ LOG.error(message);
++ throw new FailedFetchException(message);
++ }
++ } else {
++ msgs = fetchResponse.messageSet(topic, partitionId);
++ }
++ }
++ return msgs;
++ }
++
++
++ public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg) {
++ Iterable<List<Object>> tups;
++ ByteBuffer payload = msg.payload();
++ ByteBuffer key = msg.key();
++ if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
++ tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(Utils.toByteArray(key), Utils.toByteArray(payload));
++ } else {
++ tups = kafkaConfig.scheme.deserialize(Utils.toByteArray(payload));
++ }
++ return tups;
++ }
++
++
++ public static List<Partition> calculatePartitionsForTask(GlobalPartitionInformation partitionInformation, int totalTasks, int taskIndex) {
++ Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
++ List<Partition> partitions = partitionInformation.getOrderedPartitions();
++ int numPartitions = partitions.size();
++ if (numPartitions < totalTasks) {
++ LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
++ }
++ List<Partition> taskPartitions = new ArrayList<Partition>();
++ for (int i = taskIndex; i < numPartitions; i += totalTasks) {
++ Partition taskPartition = partitions.get(i);
++ taskPartitions.add(taskPartition);
++ }
++ logPartitionMapping(totalTasks, taskIndex, taskPartitions);
++ return taskPartitions;
++ }
++
++ private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
++ String taskPrefix = taskId(taskIndex, totalTasks);
++ if (taskPartitions.isEmpty()) {
++ LOG.warn(taskPrefix + "no partitions assigned");
++ } else {
++ LOG.info(taskPrefix + "assigned " + taskPartitions);
++ }
++ }
++
++ public static String taskId(int taskIndex, int totalTasks) {
++ return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
index 0000000,0000000..df31cb8
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueScheme.java
@@@ -1,0 -1,0 +1,11 @@@
++package storm.kafka;
++
++import backtype.storm.spout.Scheme;
++
++import java.util.List;
++
++public interface KeyValueScheme extends Scheme {
++
++ public List<Object> deserializeKeyAndValue(byte[] key, byte[] value);
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
index 0000000,0000000..2412a1c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@@ -1,0 -1,0 +1,19 @@@
++package storm.kafka;
++
++import backtype.storm.spout.SchemeAsMultiScheme;
++import java.util.Arrays;
++import java.util.List;
++
++public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme{
++
++ public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
++ super(scheme);
++ }
++
++ public Iterable<List<Object>> deserializeKeyAndValue(final byte[] key, final byte[] value) {
++ List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
++ if(o == null) return null;
++ else return Arrays.asList(o);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/Partition.java
index 0000000,0000000..96a3ad7
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/Partition.java
@@@ -1,0 -1,0 +1,47 @@@
++package storm.kafka;
++
++import com.google.common.base.Objects;
++import storm.trident.spout.ISpoutPartition;
++
++
++public class Partition implements ISpoutPartition {
++
++ public final Broker host;
++ public final int partition;
++
++ public Partition(Broker host, int partition) {
++ this.host = host;
++ this.partition = partition;
++ }
++
++ @Override
++ public int hashCode() {
++ return Objects.hashCode(host, partition);
++ }
++
++ @Override
++ public boolean equals(Object obj) {
++ if (this == obj) {
++ return true;
++ }
++ if (obj == null || getClass() != obj.getClass()) {
++ return false;
++ }
++ final Partition other = (Partition) obj;
++ return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
++ }
++
++ @Override
++ public String toString() {
++ return "Partition{" +
++ "host=" + host +
++ ", partition=" + partition +
++ '}';
++ }
++
++ @Override
++ public String getId() {
++ return "partition_" + partition;
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
index 0000000,0000000..d28248d
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionCoordinator.java
@@@ -1,0 -1,0 +1,9 @@@
++package storm.kafka;
++
++import java.util.List;
++
++public interface PartitionCoordinator {
++ List<PartitionManager> getMyManagedPartitions();
++
++ PartitionManager getManager(Partition partition);
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 0000000,0000000..03075bb
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@@ -1,0 -1,0 +1,224 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.metric.api.CombinedMetric;
++import backtype.storm.metric.api.CountMetric;
++import backtype.storm.metric.api.MeanReducer;
++import backtype.storm.metric.api.ReducedMetric;
++import backtype.storm.spout.SpoutOutputCollector;
++import com.google.common.collect.ImmutableMap;
++import kafka.api.OffsetRequest;
++import kafka.javaapi.consumer.SimpleConsumer;
++import kafka.javaapi.message.ByteBufferMessageSet;
++import kafka.message.MessageAndOffset;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.KafkaSpout.EmitState;
++import storm.kafka.KafkaSpout.MessageAndRealOffset;
++import storm.kafka.trident.MaxMetric;
++
++import java.util.*;
++
++public class PartitionManager {
++ public static final Logger LOG = LoggerFactory.getLogger(PartitionManager.class);
++ private final CombinedMetric _fetchAPILatencyMax;
++ private final ReducedMetric _fetchAPILatencyMean;
++ private final CountMetric _fetchAPICallCount;
++ private final CountMetric _fetchAPIMessageCount;
++
++ static class KafkaMessageId {
++ public Partition partition;
++ public long offset;
++
++ public KafkaMessageId(Partition partition, long offset) {
++ this.partition = partition;
++ this.offset = offset;
++ }
++ }
++
++ Long _emittedToOffset;
++ SortedSet<Long> _pending = new TreeSet<Long>();
++ Long _committedTo;
++ LinkedList<MessageAndRealOffset> _waitingToEmit = new LinkedList<MessageAndRealOffset>();
++ Partition _partition;
++ SpoutConfig _spoutConfig;
++ String _topologyInstanceId;
++ SimpleConsumer _consumer;
++ DynamicPartitionConnections _connections;
++ ZkState _state;
++ Map _stormConf;
++
++
++ public PartitionManager(DynamicPartitionConnections connections, String topologyInstanceId, ZkState state, Map stormConf, SpoutConfig spoutConfig, Partition id) {
++ _partition = id;
++ _connections = connections;
++ _spoutConfig = spoutConfig;
++ _topologyInstanceId = topologyInstanceId;
++ _consumer = connections.register(id.host, id.partition);
++ _state = state;
++ _stormConf = stormConf;
++
++ String jsonTopologyId = null;
++ Long jsonOffset = null;
++ String path = committedPath();
++ try {
++ Map<Object, Object> json = _state.readJSON(path);
++ LOG.info("Read partition information from: " + path + " --> " + json );
++ if (json != null) {
++ jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
++ jsonOffset = (Long) json.get("offset");
++ }
++ } catch (Throwable e) {
++ LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
++ }
++
++ if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
++ _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig);
++ LOG.info("No partition information found, using configuration to determine offset");
++ } else if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
++ _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
++ LOG.info("Topology change detected and reset from start forced, using configuration to determine offset");
++ } else {
++ _committedTo = jsonOffset;
++ LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
++ }
++
++ LOG.info("Starting " + _partition + " from offset " + _committedTo);
++ _emittedToOffset = _committedTo;
++
++ _fetchAPILatencyMax = new CombinedMetric(new MaxMetric());
++ _fetchAPILatencyMean = new ReducedMetric(new MeanReducer());
++ _fetchAPICallCount = new CountMetric();
++ _fetchAPIMessageCount = new CountMetric();
++ }
++
++ public Map getMetricsDataMap() {
++ Map ret = new HashMap();
++ ret.put(_partition + "/fetchAPILatencyMax", _fetchAPILatencyMax.getValueAndReset());
++ ret.put(_partition + "/fetchAPILatencyMean", _fetchAPILatencyMean.getValueAndReset());
++ ret.put(_partition + "/fetchAPICallCount", _fetchAPICallCount.getValueAndReset());
++ ret.put(_partition + "/fetchAPIMessageCount", _fetchAPIMessageCount.getValueAndReset());
++ return ret;
++ }
++
++ //returns false if it's reached the end of current batch
++ public EmitState next(SpoutOutputCollector collector) {
++ if (_waitingToEmit.isEmpty()) {
++ fill();
++ }
++ while (true) {
++ MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
++ if (toEmit == null) {
++ return EmitState.NO_EMITTED;
++ }
++ Iterable<List<Object>> tups = KafkaUtils.generateTuples(_spoutConfig, toEmit.msg);
++ if (tups != null) {
++ for (List<Object> tup : tups) {
++ collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
++ }
++ break;
++ } else {
++ ack(toEmit.offset);
++ }
++ }
++ if (!_waitingToEmit.isEmpty()) {
++ return EmitState.EMITTED_MORE_LEFT;
++ } else {
++ return EmitState.EMITTED_END;
++ }
++ }
++
++ private void fill() {
++ long start = System.nanoTime();
++ ByteBufferMessageSet msgs = KafkaUtils.fetchMessages(_spoutConfig, _consumer, _partition, _emittedToOffset);
++ long end = System.nanoTime();
++ long millis = (end - start) / 1000000;
++ _fetchAPILatencyMax.update(millis);
++ _fetchAPILatencyMean.update(millis);
++ _fetchAPICallCount.incr();
++ int numMessages = countMessages(msgs);
++ _fetchAPIMessageCount.incrBy(numMessages);
++
++ if (numMessages > 0) {
++ LOG.info("Fetched " + numMessages + " messages from: " + _partition);
++ }
++ for (MessageAndOffset msg : msgs) {
++ _pending.add(_emittedToOffset);
++ _waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
++ _emittedToOffset = msg.nextOffset();
++ }
++ if (numMessages > 0) {
++ LOG.info("Added " + numMessages + " messages from: " + _partition + " to internal buffers");
++ }
++ }
++
++ private int countMessages(ByteBufferMessageSet messageSet) {
++ int counter = 0;
++ for (MessageAndOffset messageAndOffset : messageSet) {
++ counter = counter + 1;
++ }
++ return counter;
++ }
++
++ public void ack(Long offset) {
++ _pending.remove(offset);
++ }
++
++ public void fail(Long offset) {
++ //TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
++ // things might get crazy with lots of timeouts
++ if (_emittedToOffset > offset) {
++ _emittedToOffset = offset;
++ _pending.tailSet(offset).clear();
++ }
++ }
++
++ public void commit() {
++ long lastCompletedOffset = lastCompletedOffset();
++ if (lastCompletedOffset != lastCommittedOffset()) {
++ LOG.info("Writing last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
++ Map<Object, Object> data = ImmutableMap.builder()
++ .put("topology", ImmutableMap.of("id", _topologyInstanceId,
++ "name", _stormConf.get(Config.TOPOLOGY_NAME)))
++ .put("offset", lastCompletedOffset)
++ .put("partition", _partition.partition)
++ .put("broker", ImmutableMap.of("host", _partition.host.host,
++ "port", _partition.host.port))
++ .put("topic", _spoutConfig.topic).build();
++ _state.writeJSON(committedPath(), data);
++ _committedTo = lastCompletedOffset;
++ LOG.info("Wrote last completed offset (" + lastCompletedOffset + ") to ZK for " + _partition + " for topology: " + _topologyInstanceId);
++ } else {
++ LOG.info("No new offset for " + _partition + " for topology: " + _topologyInstanceId);
++ }
++ }
++
++ private String committedPath() {
++ return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
++ }
++
++ public long queryPartitionOffsetLatestTime() {
++ return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
++ OffsetRequest.LatestTime());
++ }
++
++ public long lastCommittedOffset() {
++ return _committedTo;
++ }
++
++ public long lastCompletedOffset() {
++ if (_pending.isEmpty()) {
++ return _emittedToOffset;
++ } else {
++ return _pending.first();
++ }
++ }
++
++ public Partition getPartition() {
++ return _partition;
++ }
++
++ public void close() {
++ _connections.unregister(_partition.host, _partition.partition);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
index 0000000,0000000..05551ec
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/SpoutConfig.java
@@@ -1,0 -1,0 +1,19 @@@
++package storm.kafka;
++
++import java.io.Serializable;
++import java.util.List;
++
++
++public class SpoutConfig extends KafkaConfig implements Serializable {
++ public List<String> zkServers = null;
++ public Integer zkPort = null;
++ public String zkRoot = null;
++ public String id = null;
++ public long stateUpdateIntervalMs = 2000;
++
++ public SpoutConfig(BrokerHosts hosts, String topic, String zkRoot, String id) {
++ super(hosts, topic);
++ this.zkRoot = zkRoot;
++ this.id = id;
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
index 0000000,0000000..040060c
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticCoordinator.java
@@@ -1,0 -1,0 +1,31 @@@
++package storm.kafka;
++
++import java.util.ArrayList;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++
++public class StaticCoordinator implements PartitionCoordinator {
++ Map<Partition, PartitionManager> _managers = new HashMap<Partition, PartitionManager>();
++ List<PartitionManager> _allManagers = new ArrayList();
++
++ public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
++ StaticHosts hosts = (StaticHosts) config.hosts;
++ List<Partition> myPartitions = KafkaUtils.calculatePartitionsForTask(hosts.getPartitionInformation(), totalTasks, taskIndex);
++ for (Partition myPartition : myPartitions) {
++ _managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
++ }
++ _allManagers = new ArrayList(_managers.values());
++ }
++
++ @Override
++ public List<PartitionManager> getMyManagedPartitions() {
++ return _allManagers;
++ }
++
++ public PartitionManager getManager(Partition partition) {
++ return _managers.get(partition);
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
index 0000000,0000000..9ed7193
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticHosts.java
@@@ -1,0 -1,0 +1,21 @@@
++package storm.kafka;
++
++import storm.kafka.trident.GlobalPartitionInformation;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:43
++ */
++public class StaticHosts implements BrokerHosts {
++
++
++ private GlobalPartitionInformation partitionInformation;
++
++ public StaticHosts(GlobalPartitionInformation partitionInformation) {
++ this.partitionInformation = partitionInformation;
++ }
++
++ public GlobalPartitionInformation getPartitionInformation() {
++ return partitionInformation;
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
index 0000000,0000000..a9b9db1
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StaticPartitionConnections.java
@@@ -1,0 -1,0 +1,35 @@@
++package storm.kafka;
++
++import kafka.javaapi.consumer.SimpleConsumer;
++
++import java.util.HashMap;
++import java.util.Map;
++
++public class StaticPartitionConnections {
++ Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
++ KafkaConfig _config;
++ StaticHosts hosts;
++
++ public StaticPartitionConnections(KafkaConfig conf) {
++ _config = conf;
++ if (!(conf.hosts instanceof StaticHosts)) {
++ throw new RuntimeException("Must configure with static hosts");
++ }
++ this.hosts = (StaticHosts) conf.hosts;
++ }
++
++ public SimpleConsumer getConsumer(int partition) {
++ if (!_kafka.containsKey(partition)) {
++ Broker hp = hosts.getPartitionInformation().getBrokerFor(partition);
++ _kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
++
++ }
++ return _kafka.get(partition);
++ }
++
++ public void close() {
++ for (SimpleConsumer consumer : _kafka.values()) {
++ consumer.close();
++ }
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
index 0000000,0000000..a6adddb
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringKeyValueScheme.java
@@@ -1,0 -1,0 +1,20 @@@
++package storm.kafka;
++
++import backtype.storm.tuple.Values;
++import com.google.common.collect.ImmutableMap;
++
++import java.util.List;
++
++public class StringKeyValueScheme extends StringScheme implements KeyValueScheme {
++
++ @Override
++ public List<Object> deserializeKeyAndValue(byte[] key, byte[] value) {
++ if ( key == null ) {
++ return deserialize(value);
++ }
++ String keyString = StringScheme.deserializeString(key);
++ String valueString = StringScheme.deserializeString(value);
++ return new Values(ImmutableMap.of(keyString, valueString));
++ }
++
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
index 0000000,0000000..a809448
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/StringScheme.java
@@@ -1,0 -1,0 +1,29 @@@
++package storm.kafka;
++
++import backtype.storm.spout.Scheme;
++import backtype.storm.tuple.Fields;
++import backtype.storm.tuple.Values;
++
++import java.io.UnsupportedEncodingException;
++import java.util.List;
++
++public class StringScheme implements Scheme {
++
++ public static final String STRING_SCHEME_KEY = "str";
++
++ public List<Object> deserialize(byte[] bytes) {
++ return new Values(deserializeString(bytes));
++ }
++
++ public static String deserializeString(byte[] string) {
++ try {
++ return new String(string, "UTF-8");
++ } catch (UnsupportedEncodingException e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public Fields getOutputFields() {
++ return new Fields(STRING_SCHEME_KEY);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
index 0000000,0000000..ec35aed
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkCoordinator.java
@@@ -1,0 -1,0 +1,95 @@@
++package storm.kafka;
++
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++import storm.kafka.trident.GlobalPartitionInformation;
++
++import java.util.*;
++
++import static storm.kafka.KafkaUtils.taskId;
++
++public class ZkCoordinator implements PartitionCoordinator {
++ public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
++
++ SpoutConfig _spoutConfig;
++ int _taskIndex;
++ int _totalTasks;
++ String _topologyInstanceId;
++ Map<Partition, PartitionManager> _managers = new HashMap();
++ List<PartitionManager> _cachedList;
++ Long _lastRefreshTime = null;
++ int _refreshFreqMs;
++ DynamicPartitionConnections _connections;
++ DynamicBrokersReader _reader;
++ ZkState _state;
++ Map _stormConf;
++
++ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
++ this(connections, stormConf, spoutConfig, state, taskIndex, totalTasks, topologyInstanceId, buildReader(stormConf, spoutConfig));
++ }
++
++ public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId, DynamicBrokersReader reader) {
++ _spoutConfig = spoutConfig;
++ _connections = connections;
++ _taskIndex = taskIndex;
++ _totalTasks = totalTasks;
++ _topologyInstanceId = topologyInstanceId;
++ _stormConf = stormConf;
++ _state = state;
++ ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
++ _refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
++ _reader = reader;
++ }
++
++ private static DynamicBrokersReader buildReader(Map stormConf, SpoutConfig spoutConfig) {
++ ZkHosts hosts = (ZkHosts) spoutConfig.hosts;
++ return new DynamicBrokersReader(stormConf, hosts.brokerZkStr, hosts.brokerZkPath, spoutConfig.topic);
++ }
++
++ @Override
++ public List<PartitionManager> getMyManagedPartitions() {
++ if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
++ refresh();
++ _lastRefreshTime = System.currentTimeMillis();
++ }
++ return _cachedList;
++ }
++
++ void refresh() {
++ try {
++ LOG.info(taskId(_taskIndex, _totalTasks) + "Refreshing partition manager connections");
++ GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
++ List<Partition> mine = KafkaUtils.calculatePartitionsForTask(brokerInfo, _totalTasks, _taskIndex);
++
++ Set<Partition> curr = _managers.keySet();
++ Set<Partition> newPartitions = new HashSet<Partition>(mine);
++ newPartitions.removeAll(curr);
++
++ Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
++ deletedPartitions.removeAll(mine);
++
++ LOG.info(taskId(_taskIndex, _totalTasks) + "Deleted partition managers: " + deletedPartitions.toString());
++
++ for (Partition id : deletedPartitions) {
++ PartitionManager man = _managers.remove(id);
++ man.close();
++ }
++ LOG.info(taskId(_taskIndex, _totalTasks) + "New partition managers: " + newPartitions.toString());
++
++ for (Partition id : newPartitions) {
++ PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
++ _managers.put(id, man);
++ }
++
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ _cachedList = new ArrayList<PartitionManager>(_managers.values());
++ LOG.info(taskId(_taskIndex, _totalTasks) + "Finished refreshing");
++ }
++
++ @Override
++ public PartitionManager getManager(Partition partition) {
++ return _managers.get(partition);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
index 0000000,0000000..f2e0fc2
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkHosts.java
@@@ -1,0 -1,0 +1,22 @@@
++package storm.kafka;
++
++/**
++ * Date: 11/05/2013
++ * Time: 14:38
++ */
++public class ZkHosts implements BrokerHosts {
++ private static final String DEFAULT_ZK_PATH = "/brokers";
++
++ public String brokerZkStr = null;
++ public String brokerZkPath = null; // e.g., /kafka/brokers
++ public int refreshFreqSecs = 60;
++
++ public ZkHosts(String brokerZkStr, String brokerZkPath) {
++ this.brokerZkStr = brokerZkStr;
++ this.brokerZkPath = brokerZkPath;
++ }
++
++ public ZkHosts(String brokerZkStr) {
++ this(brokerZkStr, DEFAULT_ZK_PATH);
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/ZkState.java
index 0000000,0000000..d5416af
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/ZkState.java
@@@ -1,0 -1,0 +1,99 @@@
++package storm.kafka;
++
++import backtype.storm.Config;
++import backtype.storm.utils.Utils;
++import com.netflix.curator.framework.CuratorFramework;
++import com.netflix.curator.framework.CuratorFrameworkFactory;
++import com.netflix.curator.retry.RetryNTimes;
++import org.apache.zookeeper.CreateMode;
++import org.json.simple.JSONValue;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.nio.charset.Charset;
++import java.util.HashMap;
++import java.util.List;
++import java.util.Map;
++
++public class ZkState {
++ public static final Logger LOG = LoggerFactory.getLogger(ZkState.class);
++ CuratorFramework _curator;
++
++ private CuratorFramework newCurator(Map stateConf) throws Exception {
++ Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
++ String serverPorts = "";
++ for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
++ serverPorts = serverPorts + server + ":" + port + ",";
++ }
++ return CuratorFrameworkFactory.newClient(serverPorts,
++ Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
++ 15000,
++ new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
++ Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
++ }
++
++ public CuratorFramework getCurator() {
++ assert _curator != null;
++ return _curator;
++ }
++
++ public ZkState(Map stateConf) {
++ stateConf = new HashMap(stateConf);
++
++ try {
++ _curator = newCurator(stateConf);
++ _curator.start();
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public void writeJSON(String path, Map<Object, Object> data) {
++ LOG.info("Writing " + path + " the data " + data.toString());
++ writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
++ }
++
++ public void writeBytes(String path, byte[] bytes) {
++ try {
++ if (_curator.checkExists().forPath(path) == null) {
++ _curator.create()
++ .creatingParentsIfNeeded()
++ .withMode(CreateMode.PERSISTENT)
++ .forPath(path, bytes);
++ } else {
++ _curator.setData().forPath(path, bytes);
++ }
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public Map<Object, Object> readJSON(String path) {
++ try {
++ byte[] b = readBytes(path);
++ if (b == null) {
++ return null;
++ }
++ return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public byte[] readBytes(String path) {
++ try {
++ if (_curator.checkExists().forPath(path) != null) {
++ return _curator.getData().forPath(path);
++ } else {
++ return null;
++ }
++ } catch (Exception e) {
++ throw new RuntimeException(e);
++ }
++ }
++
++ public void close() {
++ _curator.close();
++ _curator = null;
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
index 0000000,0000000..89969d9
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java
@@@ -1,0 -1,0 +1,72 @@@
++package storm.kafka.bolt;
++
++import backtype.storm.task.OutputCollector;
++import backtype.storm.task.TopologyContext;
++import backtype.storm.topology.OutputFieldsDeclarer;
++import backtype.storm.topology.base.BaseRichBolt;
++import backtype.storm.tuple.Tuple;
++import kafka.javaapi.producer.Producer;
++import kafka.producer.KeyedMessage;
++import kafka.producer.ProducerConfig;
++import org.slf4j.Logger;
++import org.slf4j.LoggerFactory;
++
++import java.util.Map;
++import java.util.Properties;
++
++
++/**
++ * Bolt implementation that can send Tuple data to Kafka
++ * <p/>
++ * It expects the producer configuration and topic in storm config under
++ * <p/>
++ * 'kafka.broker.properties' and 'topic'
++ * <p/>
++ * respectively.
++ */
++public class KafkaBolt<K, V> extends BaseRichBolt {
++
++ private static final Logger LOG = LoggerFactory.getLogger(KafkaBolt.class);
++
++ public static final String TOPIC = "topic";
++ public static final String KAFKA_BROKER_PROPERTIES = "kafka.broker.properties";
++
++ public static final String BOLT_KEY = "key";
++ public static final String BOLT_MESSAGE = "message";
++
++ private Producer<K, V> producer;
++ private OutputCollector collector;
++ private String topic;
++
++ @Override
++ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
++ Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
++ Properties properties = new Properties();
++ properties.putAll(configMap);
++ ProducerConfig config = new ProducerConfig(properties);
++ producer = new Producer<K, V>(config);
++ this.topic = (String) stormConf.get(TOPIC);
++ this.collector = collector;
++ }
++
++ @Override
++ public void execute(Tuple input) {
++ K key = null;
++ if (input.contains(BOLT_KEY)) {
++ key = (K) input.getValueByField(BOLT_KEY);
++ }
++ V message = (V) input.getValueByField(BOLT_MESSAGE);
++ try {
++ producer.send(new KeyedMessage<K, V>(topic, key, message));
++ } catch (Exception ex) {
++ LOG.error("Could not send message with key '" + key + "' and value '" + message + "'", ex);
++ } finally {
++ collector.ack(input);
++ }
++ }
++
++ @Override
++ public void declareOutputFields(OutputFieldsDeclarer declarer) {
++
++ }
++}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/7d1bf2a9/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --cc external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
index 0000000,0000000..f67acaa
new file mode 100644
--- /dev/null
+++ b/external/storm-kafka/src/jvm/storm/kafka/trident/Coordinator.java
@@@ -1,0 -1,0 +1,37 @@@
++package storm.kafka.trident;
++
++import storm.kafka.KafkaUtils;
++import storm.trident.spout.IOpaquePartitionedTridentSpout;
++import storm.trident.spout.IPartitionedTridentSpout;
++
++import java.util.Map;
++
++/**
++ * Date: 11/05/2013
++ * Time: 19:35
++ */
++class Coordinator implements IPartitionedTridentSpout.Coordinator<GlobalPartitionInformation>, IOpaquePartitionedTridentSpout.Coordinator<GlobalPartitionInformation> {
++
++ private IBrokerReader reader;
++ private TridentKafkaConfig config;
++
++ public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
++ config = tridentKafkaConfig;
++ reader = KafkaUtils.makeBrokerReader(conf, config);
++ }
++
++ @Override
++ public void close() {
++ config.coordinator.close();
++ }
++
++ @Override
++ public boolean isReady(long txid) {
++ return config.coordinator.isReady(txid);
++ }
++
++ @Override
++ public GlobalPartitionInformation getPartitionsForBatch() {
++ return reader.getCurrentBrokers();
++ }
++}