You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by bo...@apache.org on 2016/01/11 21:57:36 UTC
[40/53] [abbrv] [partial] storm git commit: STORM-1202: Migrate APIs
to org.apache.storm, but try to provide some form of backwards compatability
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
index b76e230..d2ca5b8 100644
--- a/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
+++ b/external/storm-jdbc/src/main/java/org/apache/storm/jdbc/trident/state/JdbcUpdater.java
@@ -17,9 +17,9 @@
*/
package org.apache.storm.jdbc.trident.state;
-import storm.trident.operation.TridentCollector;
-import storm.trident.state.BaseStateUpdater;
-import storm.trident.tuple.TridentTuple;
+import org.apache.storm.trident.operation.TridentCollector;
+import org.apache.storm.trident.state.BaseStateUpdater;
+import org.apache.storm.trident.tuple.TridentTuple;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
index 1fda3b1..9a5ec09 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/bolt/JdbcLookupBoltTest.java
@@ -17,7 +17,7 @@
*/
package org.apache.storm.jdbc.bolt;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.tuple.Fields;
import com.google.common.collect.Lists;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.common.ConnectionProvider;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
index 718917a..fdcd053 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/spout/UserSpout.java
@@ -17,12 +17,12 @@
*/
package org.apache.storm.jdbc.spout;
-import backtype.storm.spout.SpoutOutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.IRichSpout;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Values;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.IRichSpout;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.tuple.Fields;
+import org.apache.storm.tuple.Values;
import com.google.common.collect.Lists;
import java.util.*;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
index 9df5a86..ec7ca36 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/AbstractUserTopology.java
@@ -17,10 +17,10 @@
*/
package org.apache.storm.jdbc.topology;
-import backtype.storm.Config;
-import backtype.storm.StormSubmitter;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.Config;
+import org.apache.storm.StormSubmitter;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.storm.jdbc.common.Column;
@@ -32,7 +32,7 @@ import org.apache.storm.jdbc.mapper.JdbcLookupMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcMapper;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
import org.apache.storm.jdbc.spout.UserSpout;
-import backtype.storm.LocalCluster;
+import org.apache.storm.LocalCluster;
import java.sql.Types;
import java.util.List;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
index 585994e..1915219 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTopology.java
@@ -17,8 +17,8 @@
*/
package org.apache.storm.jdbc.topology;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.topology.TopologyBuilder;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.topology.TopologyBuilder;
import com.google.common.collect.Lists;
import org.apache.storm.jdbc.bolt.JdbcInsertBolt;
import org.apache.storm.jdbc.bolt.JdbcLookupBolt;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
----------------------------------------------------------------------
diff --git a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
index 522d41a..11269c3 100644
--- a/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
+++ b/external/storm-jdbc/src/test/java/org/apache/storm/jdbc/topology/UserPersistanceTridentTopology.java
@@ -17,8 +17,8 @@
*/
package org.apache.storm.jdbc.topology;
-import backtype.storm.generated.StormTopology;
-import backtype.storm.tuple.Fields;
+import org.apache.storm.generated.StormTopology;
+import org.apache.storm.tuple.Fields;
import com.google.common.collect.Lists;
import org.apache.storm.jdbc.common.Column;
import org.apache.storm.jdbc.mapper.SimpleJdbcLookupMapper;
@@ -27,9 +27,9 @@ import org.apache.storm.jdbc.trident.state.JdbcQuery;
import org.apache.storm.jdbc.trident.state.JdbcState;
import org.apache.storm.jdbc.trident.state.JdbcStateFactory;
import org.apache.storm.jdbc.trident.state.JdbcUpdater;
-import storm.trident.Stream;
-import storm.trident.TridentState;
-import storm.trident.TridentTopology;
+import org.apache.storm.trident.Stream;
+import org.apache.storm.trident.TridentState;
+import org.apache.storm.trident.TridentTopology;
import java.sql.Types;
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/README.md
----------------------------------------------------------------------
diff --git a/external/storm-kafka/README.md b/external/storm-kafka/README.md
index 3dfa0b7..3a86cf0 100644
--- a/external/storm-kafka/README.md
+++ b/external/storm-kafka/README.md
@@ -65,7 +65,7 @@ In addition to these parameters, SpoutConfig contains the following fields that
// Exponential back-off retry settings. These are used when retrying messages after a bolt
// calls OutputCollector.fail().
- // Note: be sure to set backtype.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
+ // Note: be sure to set org.apache.storm.Config.MESSAGE_TIMEOUT_SECS appropriately to prevent
// resubmitting the message while still retrying.
public long retryInitialDelayMs = 0;
public double retryDelayMultiplier = 1.0;
@@ -190,9 +190,9 @@ use Kafka 0.8.1.1 built against Scala 2.10, you would use the following dependen
Note that the ZooKeeper and log4j dependencies are excluded to prevent version conflicts with Storm's dependencies.
##Writing to Kafka as part of your topology
-You can create an instance of storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
-are using trident you can use storm.kafka.trident.TridentState, storm.kafka.trident.TridentStateFactory and
-storm.kafka.trident.TridentKafkaUpdater.
+You can create an instance of org.apache.storm.kafka.bolt.KafkaBolt and attach it as a component to your topology or if you
+are using trident you can use org.apache.storm.kafka.trident.TridentState, org.apache.storm.kafka.trident.TridentStateFactory and
+org.apache.storm.kafka.trident.TridentKafkaUpdater.
You need to provide implementation of following 2 interfaces
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
new file mode 100644
index 0000000..0d95e8d
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Broker.java
@@ -0,0 +1,86 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import com.google.common.base.Objects;
+
+import java.io.Serializable;
+
+public class Broker implements Serializable, Comparable<Broker> {
+ public String host;
+ public int port;
+
+ // for kryo compatibility
+ private Broker() {
+
+ }
+
+ public Broker(String host, int port) {
+ this.host = host;
+ this.port = port;
+ }
+
+ public Broker(String host) {
+ this(host, 9092);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, port);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final Broker other = (Broker) obj;
+ return Objects.equal(this.host, other.host) && Objects.equal(this.port, other.port);
+ }
+
+ @Override
+ public String toString() {
+ return host + ":" + port;
+ }
+
+ public static Broker fromString(String host) {
+ Broker hp;
+ String[] spec = host.split(":");
+ if (spec.length == 1) {
+ hp = new Broker(spec[0]);
+ } else if (spec.length == 2) {
+ hp = new Broker(spec[0], Integer.parseInt(spec[1]));
+ } else {
+ throw new IllegalArgumentException("Invalid host specification: " + host);
+ }
+ return hp;
+ }
+
+
+ @Override
+ public int compareTo(Broker o) {
+ if (this.host.equals(o.host)) {
+ return this.port - o.port;
+ } else {
+ return this.host.compareTo(o.host);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
new file mode 100644
index 0000000..13ba0a1
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/BrokerHosts.java
@@ -0,0 +1,25 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.io.Serializable;
+
+
+public interface BrokerHosts extends Serializable {
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
new file mode 100644
index 0000000..2a18a7f
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ByteBufferSerializer.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.utils.Utils;
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+public class ByteBufferSerializer implements Serializer<ByteBuffer> {
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+
+ }
+
+ @Override
+ public void close() {
+
+ }
+
+ @Override
+ public byte[] serialize(String s, ByteBuffer b) {
+ return Utils.toByteArray(b);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
new file mode 100644
index 0000000..0fc85b3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicBrokersReader.java
@@ -0,0 +1,213 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.utils.Utils;
+import com.google.common.base.Preconditions;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.RetryNTimes;
+import org.json.simple.JSONValue;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+
+import java.io.UnsupportedEncodingException;
+import java.net.SocketTimeoutException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class DynamicBrokersReader {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
+
+ private CuratorFramework _curator;
+ private String _zkPath;
+ private String _topic;
+ private Boolean _isWildcardTopic;
+
+ public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
+ // Check required parameters
+ Preconditions.checkNotNull(conf, "conf cannot be null");
+
+ validateConfig(conf);
+
+ Preconditions.checkNotNull(zkStr,"zkString cannot be null");
+ Preconditions.checkNotNull(zkPath, "zkPath cannot be null");
+ Preconditions.checkNotNull(topic, "topic cannot be null");
+
+ _zkPath = zkPath;
+ _topic = topic;
+ _isWildcardTopic = Utils.getBoolean(conf.get("kafka.topic.wildcard.match"), false);
+ try {
+ _curator = CuratorFrameworkFactory.newClient(
+ zkStr,
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT)),
+ new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ _curator.start();
+ } catch (Exception ex) {
+ LOG.error("Couldn't connect to zookeeper", ex);
+ throw new RuntimeException(ex);
+ }
+ }
+
+ /**
+ * Get all partitions with their current leaders
+ */
+ public List<GlobalPartitionInformation> getBrokerInfo() throws SocketTimeoutException {
+ List<String> topics = getTopics();
+ List<GlobalPartitionInformation> partitions = new ArrayList<GlobalPartitionInformation>();
+
+ for (String topic : topics) {
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation(topic, this._isWildcardTopic);
+ try {
+ int numPartitionsForTopic = getNumPartitions(topic);
+ String brokerInfoPath = brokerPath();
+ for (int partition = 0; partition < numPartitionsForTopic; partition++) {
+ int leader = getLeaderFor(topic,partition);
+ String path = brokerInfoPath + "/" + leader;
+ try {
+ byte[] brokerData = _curator.getData().forPath(path);
+ Broker hp = getBrokerHost(brokerData);
+ globalPartitionInformation.addPartition(partition, hp);
+ } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+ LOG.error("Node {} does not exist ", path);
+ }
+ }
+ } catch (SocketTimeoutException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
+ partitions.add(globalPartitionInformation);
+ }
+ return partitions;
+ }
+
+ private int getNumPartitions(String topic) {
+ try {
+ String topicBrokersPath = partitionPath(topic);
+ List<String> children = _curator.getChildren().forPath(topicBrokersPath);
+ return children.size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ private List<String> getTopics() {
+ List<String> topics = new ArrayList<String>();
+ if (!_isWildcardTopic) {
+ topics.add(_topic);
+ return topics;
+ } else {
+ try {
+ List<String> children = _curator.getChildren().forPath(topicsPath());
+ for (String t : children) {
+ if (t.matches(_topic)) {
+ LOG.info(String.format("Found matching topic %s", t));
+ topics.add(t);
+ }
+ }
+ return topics;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ public String topicsPath () {
+ return _zkPath + "/topics";
+ }
+ public String partitionPath(String topic) {
+ return topicsPath() + "/" + topic + "/partitions";
+ }
+
+ public String brokerPath() {
+ return _zkPath + "/ids";
+ }
+
+
+
+ /**
+ * get /brokers/topics/distributedTopic/partitions/1/state
+ * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
+ * @param topic
+ * @param partition
+ * @return
+ */
+ private int getLeaderFor(String topic, long partition) {
+ try {
+ String topicBrokersPath = partitionPath(topic);
+ byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
+ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
+ Integer leader = ((Number) value.get("leader")).intValue();
+ if (leader == -1) {
+ throw new RuntimeException("No leader found for partition " + partition);
+ }
+ return leader;
+ } catch (RuntimeException e) {
+ throw e;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ public void close() {
+ _curator.close();
+ }
+
+ /**
+ * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
+ * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
+ *
+ * @param contents
+ * @return
+ */
+ private Broker getBrokerHost(byte[] contents) {
+ try {
+ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
+ String host = (String) value.get("host");
+ Integer port = ((Long) value.get("port")).intValue();
+ return new Broker(host, port);
+ } catch (UnsupportedEncodingException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * Validate required parameters in the input configuration Map
+ * @param conf
+ */
+ private void validateConfig(final Map conf) {
+ Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT),
+ "%s cannot be null", Config.STORM_ZOOKEEPER_SESSION_TIMEOUT);
+ Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT),
+ "%s cannot be null", Config.STORM_ZOOKEEPER_CONNECTION_TIMEOUT);
+ Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES),
+ "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_TIMES);
+ Preconditions.checkNotNull(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL),
+ "%s cannot be null", Config.STORM_ZOOKEEPER_RETRY_INTERVAL);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
new file mode 100644
index 0000000..6d30139
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/DynamicPartitionConnections.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import kafka.javaapi.consumer.SimpleConsumer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.IBrokerReader;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+
+public class DynamicPartitionConnections {
+
+ public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
+
+ static class ConnectionInfo {
+ SimpleConsumer consumer;
+ Set<String> partitions = new HashSet<String>();
+
+ public ConnectionInfo(SimpleConsumer consumer) {
+ this.consumer = consumer;
+ }
+ }
+
+ Map<Broker, ConnectionInfo> _connections = new HashMap();
+ KafkaConfig _config;
+ IBrokerReader _reader;
+
+ public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
+ _config = config;
+ _reader = brokerReader;
+ }
+
+ public SimpleConsumer register(Partition partition) {
+ Broker broker = _reader.getBrokerForTopic(partition.topic).getBrokerFor(partition.partition);
+ return register(broker, partition.topic, partition.partition);
+ }
+
+ public SimpleConsumer register(Broker host, String topic, int partition) {
+ if (!_connections.containsKey(host)) {
+ _connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
+ }
+ ConnectionInfo info = _connections.get(host);
+ info.partitions.add(getHashKey(topic,partition));
+ return info.consumer;
+ }
+
+ public SimpleConsumer getConnection(Partition partition) {
+ ConnectionInfo info = _connections.get(partition.host);
+ if (info != null) {
+ return info.consumer;
+ }
+ return null;
+ }
+
+ public void unregister(Broker port, String topic, int partition) {
+ ConnectionInfo info = _connections.get(port);
+ info.partitions.remove(getHashKey(topic,partition));
+ if (info.partitions.isEmpty()) {
+ info.consumer.close();
+ _connections.remove(port);
+ }
+ }
+
+ public void unregister(Partition partition) {
+ unregister(partition.host, partition.topic, partition.partition);
+ }
+
+ public void clear() {
+ for (ConnectionInfo info : _connections.values()) {
+ info.consumer.close();
+ }
+ _connections.clear();
+ }
+
+ private String getHashKey(String topic, int partition) {
+ return topic + "_" + partition;
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
new file mode 100644
index 0000000..f86d624
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/ExponentialBackoffMsgRetryManager.java
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.Comparator;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class ExponentialBackoffMsgRetryManager implements FailedMsgRetryManager {
+
+ private final long retryInitialDelayMs;
+ private final double retryDelayMultiplier;
+ private final long retryDelayMaxMs;
+
+ private Queue<MessageRetryRecord> waiting = new PriorityQueue<MessageRetryRecord>(11, new RetryTimeComparator());
+ private Map<Long,MessageRetryRecord> records = new ConcurrentHashMap<Long,MessageRetryRecord>();
+
+ public ExponentialBackoffMsgRetryManager(long retryInitialDelayMs, double retryDelayMultiplier, long retryDelayMaxMs) {
+ this.retryInitialDelayMs = retryInitialDelayMs;
+ this.retryDelayMultiplier = retryDelayMultiplier;
+ this.retryDelayMaxMs = retryDelayMaxMs;
+ }
+
+ @Override
+ public void failed(Long offset) {
+ MessageRetryRecord oldRecord = this.records.get(offset);
+ MessageRetryRecord newRecord = oldRecord == null ?
+ new MessageRetryRecord(offset) :
+ oldRecord.createNextRetryRecord();
+ this.records.put(offset, newRecord);
+ this.waiting.add(newRecord);
+ }
+
+ @Override
+ public void acked(Long offset) {
+ MessageRetryRecord record = this.records.remove(offset);
+ if (record != null) {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public void retryStarted(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ if (record == null || !this.waiting.contains(record)) {
+ throw new IllegalStateException("cannot retry a message that has not failed");
+ } else {
+ this.waiting.remove(record);
+ }
+ }
+
+ @Override
+ public Long nextFailedMessageToRetry() {
+ if (this.waiting.size() > 0) {
+ MessageRetryRecord first = this.waiting.peek();
+ if (System.currentTimeMillis() >= first.retryTimeUTC) {
+ if (this.records.containsKey(first.offset)) {
+ return first.offset;
+ } else {
+ // defensive programming - should be impossible
+ this.waiting.remove(first);
+ return nextFailedMessageToRetry();
+ }
+ }
+ }
+ return null;
+ }
+
+ @Override
+ public boolean shouldRetryMsg(Long offset) {
+ MessageRetryRecord record = this.records.get(offset);
+ return record != null &&
+ this.waiting.contains(record) &&
+ System.currentTimeMillis() >= record.retryTimeUTC;
+ }
+
+ @Override
+ public Set<Long> clearInvalidMessages(Long kafkaOffset) {
+ Set<Long> invalidOffsets = new HashSet<Long>();
+ for(Long offset : records.keySet()){
+ if(offset < kafkaOffset){
+ MessageRetryRecord record = this.records.remove(offset);
+ if (record != null) {
+ this.waiting.remove(record);
+ invalidOffsets.add(offset);
+ }
+ }
+ }
+ return invalidOffsets;
+ }
+
+ /**
+ * A MessageRetryRecord holds the data of how many times a message has
+ * failed and been retried, and when the last failure occurred. It can
+ * determine whether it is ready to be retried by employing an exponential
+ * back-off calculation using config values stored in SpoutConfig:
+ * <ul>
+ * <li>retryInitialDelayMs - time to delay before the first retry</li>
+ * <li>retryDelayMultiplier - multiplier by which to increase the delay for each subsequent retry</li>
+ * <li>retryDelayMaxMs - maximum retry delay (once this delay time is reached, subsequent retries will
+ * delay for this amount of time every time)
+ * </li>
+ * </ul>
+ */
+ private class MessageRetryRecord {
+ private final long offset;
+ private final int retryNum;
+ private final long retryTimeUTC;
+
+ public MessageRetryRecord(long offset) {
+ this(offset, 1);
+ }
+
+ private MessageRetryRecord(long offset, int retryNum) {
+ this.offset = offset;
+ this.retryNum = retryNum;
+ this.retryTimeUTC = System.currentTimeMillis() + calculateRetryDelay();
+ }
+
+ /**
+ * Create a MessageRetryRecord for the next retry that should occur after this one.
+ * @return MessageRetryRecord with the next retry time, or null to indicate that another
+ * retry should not be performed. The latter case can happen if we are about to
+ * run into the org.apache.storm.Config.TOPOLOGY_MESSAGE_TIMEOUT_SECS in the Storm
+ * configuration.
+ */
+ public MessageRetryRecord createNextRetryRecord() {
+ return new MessageRetryRecord(this.offset, this.retryNum + 1);
+ }
+
+ private long calculateRetryDelay() {
+ double delayMultiplier = Math.pow(retryDelayMultiplier, this.retryNum - 1);
+ double delay = retryInitialDelayMs * delayMultiplier;
+ Long maxLong = Long.MAX_VALUE;
+ long delayThisRetryMs = delay >= maxLong.doubleValue()
+ ? maxLong
+ : (long) delay;
+ return Math.min(delayThisRetryMs, retryDelayMaxMs);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ return (other instanceof MessageRetryRecord
+ && this.offset == ((MessageRetryRecord) other).offset);
+ }
+
+ @Override
+ public int hashCode() {
+ return Long.valueOf(this.offset).hashCode();
+ }
+ }
+
+ private static class RetryTimeComparator implements Comparator<MessageRetryRecord> {
+
+ @Override
+ public int compare(MessageRetryRecord record1, MessageRetryRecord record2) {
+ return Long.valueOf(record1.retryTimeUTC).compareTo(Long.valueOf(record2.retryTimeUTC));
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ return false;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
new file mode 100644
index 0000000..448d0c3
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedFetchException.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+public class FailedFetchException extends RuntimeException {
+
+ public FailedFetchException(String message) {
+ super(message);
+ }
+
+ public FailedFetchException(Exception e) {
+ super(e);
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
new file mode 100644
index 0000000..e9a7092
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/FailedMsgRetryManager.java
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.Set;
+
+public interface FailedMsgRetryManager {
+ public void failed(Long offset);
+ public void acked(Long offset);
+ public void retryStarted(Long offset);
+ public Long nextFailedMessageToRetry();
+ public boolean shouldRetryMsg(Long offset);
+ public Set<Long> clearInvalidMessages(Long kafkaOffset);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
new file mode 100644
index 0000000..75f5563
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/IntSerializer.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.kafka.common.serialization.Serializer;
+
+import java.nio.ByteBuffer;
+import java.nio.IntBuffer;
+import java.util.Map;
+
+public class IntSerializer implements Serializer<Integer> {
+ @Override
+ public void configure(Map<String, ?> map, boolean b) {
+ }
+
+ @Override
+ public byte[] serialize(String topic, Integer val) {
+ byte[] r = new byte[4];
+ IntBuffer b = ByteBuffer.wrap(r).asIntBuffer();
+ b.put(val);
+ return r;
+ }
+
+ @Override
+ public void close() {
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
new file mode 100644
index 0000000..e1e1d24
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaConfig.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.MultiScheme;
+import org.apache.storm.spout.RawMultiScheme;
+
+import java.io.Serializable;
+
+public class KafkaConfig implements Serializable {
+ private static final long serialVersionUID = 5276718734571623855L;
+
+ public final BrokerHosts hosts;
+ public final String topic;
+ public final String clientId;
+
+ public int fetchSizeBytes = 1024 * 1024;
+ public int socketTimeoutMs = 10000;
+ public int fetchMaxWait = 10000;
+ public int bufferSizeBytes = 1024 * 1024;
+ public MultiScheme scheme = new RawMultiScheme();
+ public boolean ignoreZkOffsets = false;
+ public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
+ public long maxOffsetBehind = Long.MAX_VALUE;
+ public boolean useStartOffsetTimeIfOffsetOutOfRange = true;
+ public int metricsTimeBucketSizeInSecs = 60;
+
+ public KafkaConfig(BrokerHosts hosts, String topic) {
+ this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
+ }
+
+ public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+ this.hosts = hosts;
+ this.topic = topic;
+ this.clientId = clientId;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
new file mode 100644
index 0000000..1d866e7
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaError.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+public enum KafkaError {
+ NO_ERROR,
+ OFFSET_OUT_OF_RANGE,
+ INVALID_MESSAGE,
+ UNKNOWN_TOPIC_OR_PARTITION,
+ INVALID_FETCH_SIZE,
+ LEADER_NOT_AVAILABLE,
+ NOT_LEADER_FOR_PARTITION,
+ REQUEST_TIMED_OUT,
+ BROKER_NOT_AVAILABLE,
+ REPLICA_NOT_AVAILABLE,
+ MESSAGE_SIZE_TOO_LARGE,
+ STALE_CONTROLLER_EPOCH,
+ OFFSET_METADATA_TOO_LARGE,
+ UNKNOWN;
+
+ public static KafkaError getError(int errorCode) {
+ if (errorCode < 0 || errorCode >= UNKNOWN.ordinal()) {
+ return UNKNOWN;
+ } else {
+ return values()[errorCode];
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
new file mode 100644
index 0000000..7a83ae0
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaSpout.java
@@ -0,0 +1,198 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.Config;
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.spout.SpoutOutputCollector;
+import org.apache.storm.task.TopologyContext;
+import org.apache.storm.topology.OutputFieldsDeclarer;
+import org.apache.storm.topology.base.BaseRichSpout;
+import com.google.common.base.Strings;
+import kafka.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.PartitionManager.KafkaMessageId;
+
+import java.util.*;
+
+// TODO: need to add blacklisting
+// TODO: need to make a best effort to not re-emit messages if don't have to
+public class KafkaSpout extends BaseRichSpout {
+ static enum EmitState {
+ EMITTED_MORE_LEFT,
+ EMITTED_END,
+ NO_EMITTED
+ }
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaSpout.class);
+
+ SpoutConfig _spoutConfig;
+ SpoutOutputCollector _collector;
+ PartitionCoordinator _coordinator;
+ DynamicPartitionConnections _connections;
+ ZkState _state;
+
+ long _lastUpdateMs = 0;
+
+ int _currPartitionIndex = 0;
+
+ public KafkaSpout(SpoutConfig spoutConf) {
+ _spoutConfig = spoutConf;
+ }
+
+ @Override
+ public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
+ _collector = collector;
+ String topologyInstanceId = context.getStormId();
+ Map stateConf = new HashMap(conf);
+ List<String> zkServers = _spoutConfig.zkServers;
+ if (zkServers == null) {
+ zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ }
+ Integer zkPort = _spoutConfig.zkPort;
+ if (zkPort == null) {
+ zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ }
+ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
+ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
+ stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
+ _state = new ZkState(stateConf);
+
+ _connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
+
+ // using TransactionalState like this is a hack
+ int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
+ if (_spoutConfig.hosts instanceof StaticHosts) {
+ _coordinator = new StaticCoordinator(_connections, conf,
+ _spoutConfig, _state, context.getThisTaskIndex(),
+ totalTasks, topologyInstanceId);
+ } else {
+ _coordinator = new ZkCoordinator(_connections, conf,
+ _spoutConfig, _state, context.getThisTaskIndex(),
+ totalTasks, topologyInstanceId);
+ }
+
+ context.registerMetric("kafkaOffset", new IMetric() {
+ KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_connections);
+
+ @Override
+ public Object getValueAndReset() {
+ List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
+ Set<Partition> latestPartitions = new HashSet();
+ for (PartitionManager pm : pms) {
+ latestPartitions.add(pm.getPartition());
+ }
+ _kafkaOffsetMetric.refreshPartitions(latestPartitions);
+ for (PartitionManager pm : pms) {
+ _kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
+ }
+ return _kafkaOffsetMetric.getValueAndReset();
+ }
+ }, _spoutConfig.metricsTimeBucketSizeInSecs);
+
+ context.registerMetric("kafkaPartition", new IMetric() {
+ @Override
+ public Object getValueAndReset() {
+ List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
+ Map concatMetricsDataMaps = new HashMap();
+ for (PartitionManager pm : pms) {
+ concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
+ }
+ return concatMetricsDataMaps;
+ }
+ }, _spoutConfig.metricsTimeBucketSizeInSecs);
+ }
+
+ @Override
+ public void close() {
+ _state.close();
+ }
+
+ @Override
+ public void nextTuple() {
+ List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
+ for (int i = 0; i < managers.size(); i++) {
+
+ try {
+ // in case the number of managers decreased
+ _currPartitionIndex = _currPartitionIndex % managers.size();
+ EmitState state = managers.get(_currPartitionIndex).next(_collector);
+ if (state != EmitState.EMITTED_MORE_LEFT) {
+ _currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
+ }
+ if (state != EmitState.NO_EMITTED) {
+ break;
+ }
+ } catch (FailedFetchException e) {
+ LOG.warn("Fetch failed", e);
+ _coordinator.refresh();
+ }
+ }
+
+ long diffWithNow = System.currentTimeMillis() - _lastUpdateMs;
+
+ /*
+ As far as the System.currentTimeMillis() is dependent on System clock,
+ additional check on negative value of diffWithNow in case of external changes.
+ */
+ if (diffWithNow > _spoutConfig.stateUpdateIntervalMs || diffWithNow < 0) {
+ commit();
+ }
+ }
+
+ @Override
+ public void ack(Object msgId) {
+ KafkaMessageId id = (KafkaMessageId) msgId;
+ PartitionManager m = _coordinator.getManager(id.partition);
+ if (m != null) {
+ m.ack(id.offset);
+ }
+ }
+
+ @Override
+ public void fail(Object msgId) {
+ KafkaMessageId id = (KafkaMessageId) msgId;
+ PartitionManager m = _coordinator.getManager(id.partition);
+ if (m != null) {
+ m.fail(id.offset);
+ }
+ }
+
+ @Override
+ public void deactivate() {
+ commit();
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ if (!Strings.isNullOrEmpty(_spoutConfig.outputStreamId)) {
+ declarer.declareStream(_spoutConfig.outputStreamId, _spoutConfig.scheme.getOutputFields());
+ } else {
+ declarer.declare(_spoutConfig.scheme.getOutputFields());
+ }
+ }
+
+ private void commit() {
+ _lastUpdateMs = System.currentTimeMillis();
+ for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
+ manager.commit();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
new file mode 100644
index 0000000..8cd0fd0
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KafkaUtils.java
@@ -0,0 +1,275 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.metric.api.IMetric;
+import org.apache.storm.utils.Utils;
+import com.google.common.base.Preconditions;
+import kafka.api.FetchRequest;
+import kafka.api.FetchRequestBuilder;
+import kafka.api.PartitionOffsetRequestInfo;
+import kafka.common.TopicAndPartition;
+import kafka.javaapi.FetchResponse;
+import kafka.javaapi.OffsetRequest;
+import kafka.javaapi.consumer.SimpleConsumer;
+import kafka.javaapi.message.ByteBufferMessageSet;
+import kafka.message.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.apache.storm.kafka.trident.GlobalPartitionInformation;
+import org.apache.storm.kafka.trident.IBrokerReader;
+import org.apache.storm.kafka.trident.StaticBrokerReader;
+import org.apache.storm.kafka.trident.ZkBrokerReader;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.net.SocketTimeoutException;
+import java.nio.ByteBuffer;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.*;
+
+
+public class KafkaUtils {
+
+ public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
+ private static final int NO_OFFSET = -5;
+
+
+ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+ if (conf.hosts instanceof StaticHosts) {
+ return new StaticBrokerReader(conf.topic, ((StaticHosts) conf.hosts).getPartitionInformation());
+ } else {
+ return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+ }
+ }
+
+
+ public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
+ long startOffsetTime = config.startOffsetTime;
+ return getOffset(consumer, topic, partition, startOffsetTime);
+ }
+
+ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+ OffsetRequest request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+
+ long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+ if (offsets.length > 0) {
+ return offsets[0];
+ } else {
+ return NO_OFFSET;
+ }
+ }
+
+ public static class KafkaOffsetMetric implements IMetric {
+ Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
+ Set<Partition> _partitions;
+ DynamicPartitionConnections _connections;
+
+ public KafkaOffsetMetric(DynamicPartitionConnections connections) {
+ _connections = connections;
+ }
+
+ public void setLatestEmittedOffset(Partition partition, long offset) {
+ _partitionToOffset.put(partition, offset);
+ }
+
+ private class TopicMetrics {
+ long totalSpoutLag = 0;
+ long totalEarliestTimeOffset = 0;
+ long totalLatestTimeOffset = 0;
+ long totalLatestEmittedOffset = 0;
+ }
+
+ @Override
+ public Object getValueAndReset() {
+ try {
+ HashMap ret = new HashMap();
+ if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+ Map<String,TopicMetrics> topicMetricsMap = new TreeMap<String, TopicMetrics>();
+ for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+ Partition partition = e.getKey();
+ SimpleConsumer consumer = _connections.getConnection(partition);
+ if (consumer == null) {
+ LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
+ return null;
+ }
+ long latestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
+ long earliestTimeOffset = getOffset(consumer, partition.topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ if (latestTimeOffset == KafkaUtils.NO_OFFSET) {
+ LOG.warn("No data found in Kafka Partition " + partition.getId());
+ return null;
+ }
+ long latestEmittedOffset = e.getValue();
+ long spoutLag = latestTimeOffset - latestEmittedOffset;
+ String topic = partition.topic;
+ String metricPath = partition.getId();
+ //Handle the case where Partition Path Id does not contain topic name Partition.getId() == "partition_" + partition
+ if (!metricPath.startsWith(topic + "/")) {
+ metricPath = topic + "/" + metricPath;
+ }
+ ret.put(metricPath + "/" + "spoutLag", spoutLag);
+ ret.put(metricPath + "/" + "earliestTimeOffset", earliestTimeOffset);
+ ret.put(metricPath + "/" + "latestTimeOffset", latestTimeOffset);
+ ret.put(metricPath + "/" + "latestEmittedOffset", latestEmittedOffset);
+
+ if (!topicMetricsMap.containsKey(partition.topic)) {
+ topicMetricsMap.put(partition.topic,new TopicMetrics());
+ }
+
+ TopicMetrics topicMetrics = topicMetricsMap.get(partition.topic);
+ topicMetrics.totalSpoutLag += spoutLag;
+ topicMetrics.totalEarliestTimeOffset += earliestTimeOffset;
+ topicMetrics.totalLatestTimeOffset += latestTimeOffset;
+ topicMetrics.totalLatestEmittedOffset += latestEmittedOffset;
+ }
+
+ for(Map.Entry<String, TopicMetrics> e : topicMetricsMap.entrySet()) {
+ String topic = e.getKey();
+ TopicMetrics topicMetrics = e.getValue();
+ ret.put(topic + "/" + "totalSpoutLag", topicMetrics.totalSpoutLag);
+ ret.put(topic + "/" + "totalEarliestTimeOffset", topicMetrics.totalEarliestTimeOffset);
+ ret.put(topic + "/" + "totalLatestTimeOffset", topicMetrics.totalLatestTimeOffset);
+ ret.put(topic + "/" + "totalLatestEmittedOffset", topicMetrics.totalLatestEmittedOffset);
+ }
+
+ return ret;
+ } else {
+ LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
+ }
+ } catch (Throwable t) {
+ LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
+ }
+ return null;
+ }
+
+ public void refreshPartitions(Set<Partition> partitions) {
+ _partitions = partitions;
+ Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+ while (it.hasNext()) {
+ if (!partitions.contains(it.next())) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ public static ByteBufferMessageSet fetchMessages(KafkaConfig config, SimpleConsumer consumer, Partition partition, long offset)
+ throws TopicOffsetOutOfRangeException, FailedFetchException,RuntimeException {
+ ByteBufferMessageSet msgs = null;
+ String topic = partition.topic;
+ int partitionId = partition.partition;
+ FetchRequestBuilder builder = new FetchRequestBuilder();
+ FetchRequest fetchRequest = builder.addFetch(topic, partitionId, offset, config.fetchSizeBytes).
+ clientId(config.clientId).maxWait(config.fetchMaxWait).build();
+ FetchResponse fetchResponse;
+ try {
+ fetchResponse = consumer.fetch(fetchRequest);
+ } catch (Exception e) {
+ if (e instanceof ConnectException ||
+ e instanceof SocketTimeoutException ||
+ e instanceof IOException ||
+ e instanceof UnresolvedAddressException
+ ) {
+ LOG.warn("Network error when fetching messages:", e);
+ throw new FailedFetchException(e);
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ if (fetchResponse.hasError()) {
+ KafkaError error = KafkaError.getError(fetchResponse.errorCode(topic, partitionId));
+ if (error.equals(KafkaError.OFFSET_OUT_OF_RANGE) && config.useStartOffsetTimeIfOffsetOutOfRange) {
+ String msg = partition + " Got fetch request with offset out of range: [" + offset + "]";
+ LOG.warn(msg);
+ throw new TopicOffsetOutOfRangeException(msg);
+ } else {
+ String message = "Error fetching data from [" + partition + "] for topic [" + topic + "]: [" + error + "]";
+ LOG.error(message);
+ throw new FailedFetchException(message);
+ }
+ } else {
+ msgs = fetchResponse.messageSet(topic, partitionId);
+ }
+ return msgs;
+ }
+
+
+ public static Iterable<List<Object>> generateTuples(KafkaConfig kafkaConfig, Message msg, String topic) {
+ Iterable<List<Object>> tups;
+ ByteBuffer payload = msg.payload();
+ if (payload == null) {
+ return null;
+ }
+ ByteBuffer key = msg.key();
+ if (key != null && kafkaConfig.scheme instanceof KeyValueSchemeAsMultiScheme) {
+ tups = ((KeyValueSchemeAsMultiScheme) kafkaConfig.scheme).deserializeKeyAndValue(key, payload);
+ } else {
+ if (kafkaConfig.scheme instanceof StringMultiSchemeWithTopic) {
+ tups = ((StringMultiSchemeWithTopic)kafkaConfig.scheme).deserializeWithTopic(topic, payload);
+ } else {
+ tups = kafkaConfig.scheme.deserialize(payload);
+ }
+ }
+ return tups;
+ }
+
+ public static Iterable<List<Object>> generateTuples(MessageMetadataSchemeAsMultiScheme scheme, Message msg, Partition partition, long offset) {
+ ByteBuffer payload = msg.payload();
+ if (payload == null) {
+ return null;
+ }
+ return scheme.deserializeMessageWithMetadata(payload, partition, offset);
+ }
+
+
+ public static List<Partition> calculatePartitionsForTask(List<GlobalPartitionInformation> partitons, int totalTasks, int taskIndex) {
+ Preconditions.checkArgument(taskIndex < totalTasks, "task index must be less that total tasks");
+ List<Partition> taskPartitions = new ArrayList<Partition>();
+ List<Partition> partitions = new ArrayList<Partition>();
+ for(GlobalPartitionInformation partitionInformation : partitons) {
+ partitions.addAll(partitionInformation.getOrderedPartitions());
+ }
+ int numPartitions = partitions.size();
+ if (numPartitions < totalTasks) {
+ LOG.warn("there are more tasks than partitions (tasks: " + totalTasks + "; partitions: " + numPartitions + "), some tasks will be idle");
+ }
+ for (int i = taskIndex; i < numPartitions; i += totalTasks) {
+ Partition taskPartition = partitions.get(i);
+ taskPartitions.add(taskPartition);
+ }
+ logPartitionMapping(totalTasks, taskIndex, taskPartitions);
+ return taskPartitions;
+ }
+
+ private static void logPartitionMapping(int totalTasks, int taskIndex, List<Partition> taskPartitions) {
+ String taskPrefix = taskId(taskIndex, totalTasks);
+ if (taskPartitions.isEmpty()) {
+ LOG.warn(taskPrefix + "no partitions assigned");
+ } else {
+ LOG.info(taskPrefix + "assigned " + taskPartitions);
+ }
+ }
+
+ public static String taskId(int taskIndex, int totalTasks) {
+ return "Task [" + (taskIndex + 1) + "/" + totalTasks + "] ";
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
new file mode 100644
index 0000000..3f9acc2
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface KeyValueScheme extends Scheme {
+ List<Object> deserializeKeyAndValue(ByteBuffer key, ByteBuffer value);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
new file mode 100644
index 0000000..25053dd
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/KeyValueSchemeAsMultiScheme.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+public class KeyValueSchemeAsMultiScheme extends SchemeAsMultiScheme {
+
+ public KeyValueSchemeAsMultiScheme(KeyValueScheme scheme) {
+ super(scheme);
+ }
+
+ public Iterable<List<Object>> deserializeKeyAndValue(final ByteBuffer key, final ByteBuffer value) {
+ List<Object> o = ((KeyValueScheme)scheme).deserializeKeyAndValue(key, value);
+ if(o == null) return null;
+ else return Arrays.asList(o);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
new file mode 100644
index 0000000..d0fc08e
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataScheme.java
@@ -0,0 +1,27 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import org.apache.storm.spout.Scheme;
+
+import java.nio.ByteBuffer;
+import java.util.List;
+
+public interface MessageMetadataScheme extends Scheme {
+ List<Object> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset);
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
new file mode 100644
index 0000000..a53fa88
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/MessageMetadataSchemeAsMultiScheme.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.storm.spout.SchemeAsMultiScheme;
+
+public class MessageMetadataSchemeAsMultiScheme extends SchemeAsMultiScheme {
+ private static final long serialVersionUID = -7172403703813625116L;
+
+ public MessageMetadataSchemeAsMultiScheme(MessageMetadataScheme scheme) {
+ super(scheme);
+ }
+
+ public Iterable<List<Object>> deserializeMessageWithMetadata(ByteBuffer message, Partition partition, long offset) {
+ List<Object> o = ((MessageMetadataScheme) scheme).deserializeMessageWithMetadata(message, partition, offset);
+ if (o == null) {
+ return null;
+ } else {
+ return Arrays.asList(o);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
new file mode 100644
index 0000000..afdf8af
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/Partition.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import com.google.common.base.Objects;
+import org.apache.storm.trident.spout.ISpoutPartition;
+
+
+public class Partition implements ISpoutPartition {
+
+ public Broker host;
+ public int partition;
+ public String topic;
+
+ //Flag to keep the Partition Path Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
+ private Boolean bUseTopicNameForPartitionPathId;
+
+ // for kryo compatibility
+ private Partition() {
+
+ }
+ public Partition(Broker host, String topic, int partition) {
+ this.topic = topic;
+ this.host = host;
+ this.partition = partition;
+ this.bUseTopicNameForPartitionPathId = false;
+ }
+
+ public Partition(Broker host, String topic, int partition,Boolean bUseTopicNameForPartitionPathId) {
+ this.topic = topic;
+ this.host = host;
+ this.partition = partition;
+ this.bUseTopicNameForPartitionPathId = bUseTopicNameForPartitionPathId;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, topic, partition);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final Partition other = (Partition) obj;
+ return Objects.equal(this.host, other.host) && Objects.equal(this.topic, other.topic) && Objects.equal(this.partition, other.partition);
+ }
+
+ @Override
+ public String toString() {
+ return "Partition{" +
+ "host=" + host +
+ ", topic=" + topic +
+ ", partition=" + partition +
+ '}';
+ }
+
+ @Override
+ public String getId() {
+ if (bUseTopicNameForPartitionPathId) {
+ return topic + "/partition_" + partition;
+ } else {
+ //Keep the Partition Id backward compatible with Old implementation of Partition.getId() == "partition_" + partition
+ return "partition_" + partition;
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/storm/blob/d839d1bf/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
new file mode 100644
index 0000000..c9004fa
--- /dev/null
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/PartitionCoordinator.java
@@ -0,0 +1,28 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.storm.kafka;
+
+import java.util.List;
+
+public interface PartitionCoordinator {
+ List<PartitionManager> getMyManagedPartitions();
+
+ PartitionManager getManager(Partition partition);
+
+ void refresh();
+}