You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:03 UTC
[04/50] [abbrv] git commit: use consistent formatting
use consistent formatting
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/e8f54d63
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/e8f54d63
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/e8f54d63
Branch: refs/heads/master
Commit: e8f54d63094806a2a1364adb5e209ab2ca10f0f0
Parents: d35a6ee
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Dec 22 15:58:01 2013 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Dec 22 15:58:01 2013 +0000
----------------------------------------------------------------------
src/jvm/storm/kafka/DynamicBrokersReader.java | 158 +++----
.../kafka/DynamicPartitionConnections.java | 34 +-
src/jvm/storm/kafka/HostPort.java | 50 +-
src/jvm/storm/kafka/KafkaConfig.java | 16 +-
src/jvm/storm/kafka/KafkaSpout.java | 45 +-
src/jvm/storm/kafka/Partition.java | 50 +-
src/jvm/storm/kafka/PartitionCoordinator.java | 1 +
src/jvm/storm/kafka/PartitionManager.java | 100 ++--
src/jvm/storm/kafka/StaticCoordinator.java | 12 +-
src/jvm/storm/kafka/StaticHosts.java | 17 +-
.../storm/kafka/StaticPartitionConnections.java | 8 +-
src/jvm/storm/kafka/ZkCoordinator.java | 48 +-
src/jvm/storm/kafka/ZkHosts.java | 22 +-
src/jvm/storm/kafka/ZkState.java | 72 +--
src/jvm/storm/kafka/trident/Coordinator.java | 44 +-
.../storm/kafka/trident/DefaultCoordinator.java | 2 +-
.../trident/GlobalPartitionInformation.java | 86 ++--
.../storm/kafka/trident/IBatchCoordinator.java | 1 +
src/jvm/storm/kafka/trident/IBrokerReader.java | 1 +
src/jvm/storm/kafka/trident/KafkaUtils.java | 74 +--
src/jvm/storm/kafka/trident/MaxMetric.java | 8 +-
.../kafka/trident/OpaqueTridentKafkaSpout.java | 14 +-
.../storm/kafka/trident/StaticBrokerReader.java | 6 +-
.../trident/TransactionalTridentKafkaSpout.java | 8 +-
.../storm/kafka/trident/TridentKafkaConfig.java | 6 +-
.../kafka/trident/TridentKafkaEmitter.java | 466 ++++++++++---------
src/jvm/storm/kafka/trident/ZkBrokerReader.java | 12 +-
.../storm/kafka/DynamicBrokersReaderTest.java | 258 +++++-----
28 files changed, 821 insertions(+), 798 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicBrokersReader.java b/src/jvm/storm/kafka/DynamicBrokersReader.java
index ae15534..c802baf 100644
--- a/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -12,117 +12,117 @@ import storm.kafka.trident.GlobalPartitionInformation;
import java.io.IOException;
import java.io.UnsupportedEncodingException;
-import java.util.*;
+import java.util.List;
+import java.util.Map;
public class DynamicBrokersReader {
- public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
+ public static final Logger LOG = LoggerFactory.getLogger(DynamicBrokersReader.class);
private CuratorFramework _curator;
private String _zkPath;
private String _topic;
-
+
public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
- _zkPath = zkPath;
- _topic = topic;
- try {
- _curator = CuratorFrameworkFactory.newClient(
- zkStr,
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- 15000,
- new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- _curator.start();
- } catch (IOException ex) {
- LOG.error("can't connect to zookeeper");
- }
+ _zkPath = zkPath;
+ _topic = topic;
+ try {
+ _curator = CuratorFrameworkFactory.newClient(
+ zkStr,
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+ 15000,
+ new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ _curator.start();
+ } catch (IOException ex) {
+ LOG.error("can't connect to zookeeper");
+ }
}
-
+
/**
- * Get all partitions with their current leaders
+ * Get all partitions with their current leaders
*/
public GlobalPartitionInformation getBrokerInfo() {
- GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
+ GlobalPartitionInformation globalPartitionInformation = new GlobalPartitionInformation();
try {
- int numPartitionsForTopic = getNumPartitions();
- String brokerInfoPath = brokerPath();
- for (int partition = 0; partition < numPartitionsForTopic; partition++) {
- int leader = getLeaderFor(partition);
- String path = brokerInfoPath + "/" + leader;
- try {
- byte[] hostPortData = _curator.getData().forPath(path);
- HostPort hp = getBrokerHost(hostPortData);
- globalPartitionInformation.addPartition(partition, hp);
- } catch(org.apache.zookeeper.KeeperException.NoNodeException e) {
- LOG.error("Node {} does not exist ", path);
- }
- }
- } catch(Exception e) {
+ int numPartitionsForTopic = getNumPartitions();
+ String brokerInfoPath = brokerPath();
+ for (int partition = 0; partition < numPartitionsForTopic; partition++) {
+ int leader = getLeaderFor(partition);
+ String path = brokerInfoPath + "/" + leader;
+ try {
+ byte[] hostPortData = _curator.getData().forPath(path);
+ HostPort hp = getBrokerHost(hostPortData);
+ globalPartitionInformation.addPartition(partition, hp);
+ } catch (org.apache.zookeeper.KeeperException.NoNodeException e) {
+ LOG.error("Node {} does not exist ", path);
+ }
+ }
+ } catch (Exception e) {
throw new RuntimeException(e);
}
- LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
+ LOG.info("Read partition info from zookeeper: " + globalPartitionInformation);
return globalPartitionInformation;
}
+ private int getNumPartitions() {
+ try {
+ String topicBrokersPath = partitionPath();
+ List<String> children = _curator.getChildren().forPath(topicBrokersPath);
+ return children.size();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
- private int getNumPartitions() {
- try {
- String topicBrokersPath = partitionPath();
- List<String> children = _curator.getChildren().forPath(topicBrokersPath);
- return children.size();
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
- }
-
- public String partitionPath() {
- return _zkPath + "/topics/" + _topic + "/partitions";
- }
+ public String partitionPath() {
+ return _zkPath + "/topics/" + _topic + "/partitions";
+ }
- public String brokerPath() {
- return _zkPath + "/ids";
- }
+ public String brokerPath() {
+ return _zkPath + "/ids";
+ }
- /**
- * get /brokers/topics/distributedTopic/partitions/1/state
- * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
- * @param partition
- * @return
- */
- private int getLeaderFor(long partition) {
- try {
- String topicBrokersPath = partitionPath();
- byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state" );
- Map<Object, Object> value = (Map<Object,Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
- Integer leader = ((Number) value.get("leader")).intValue();
- return leader;
- } catch (Exception e) {
- throw new RuntimeException(e);
- }
- }
+ /**
+ * get /brokers/topics/distributedTopic/partitions/1/state
+ * { "controller_epoch":4, "isr":[ 1, 0 ], "leader":1, "leader_epoch":1, "version":1 }
+ *
+ * @param partition
+ * @return
+ */
+ private int getLeaderFor(long partition) {
+ try {
+ String topicBrokersPath = partitionPath();
+ byte[] hostPortData = _curator.getData().forPath(topicBrokersPath + "/" + partition + "/state");
+ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(hostPortData, "UTF-8"));
+ Integer leader = ((Number) value.get("leader")).intValue();
+ return leader;
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
public void close() {
_curator.close();
}
- /**
- *
- * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
- * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
- *
- * @param contents
- * @return
- */
+ /**
+ * [zk: localhost:2181(CONNECTED) 56] get /brokers/ids/0
+ * { "host":"localhost", "jmx_port":9999, "port":9092, "version":1 }
+ *
+ * @param contents
+ * @return
+ */
private HostPort getBrokerHost(byte[] contents) {
try {
- Map<Object, Object> value = (Map<Object,Object>) JSONValue.parse(new String(contents, "UTF-8"));
- String host = (String) value.get("host");
- Integer port = ((Long) value.get("port")).intValue();
+ Map<Object, Object> value = (Map<Object, Object>) JSONValue.parse(new String(contents, "UTF-8"));
+ String host = (String) value.get("host");
+ Integer port = ((Long) value.get("port")).intValue();
return new HostPort(host, port);
} catch (UnsupportedEncodingException e) {
throw new RuntimeException(e);
}
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/DynamicPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/DynamicPartitionConnections.java b/src/jvm/storm/kafka/DynamicPartitionConnections.java
index 409ec37..7a799a0 100644
--- a/src/jvm/storm/kafka/DynamicPartitionConnections.java
+++ b/src/jvm/storm/kafka/DynamicPartitionConnections.java
@@ -13,33 +13,33 @@ import java.util.Set;
public class DynamicPartitionConnections {
- public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
+ public static final Logger LOG = LoggerFactory.getLogger(DynamicPartitionConnections.class);
static class ConnectionInfo {
SimpleConsumer consumer;
Set<Integer> partitions = new HashSet();
-
+
public ConnectionInfo(SimpleConsumer consumer) {
this.consumer = consumer;
}
}
-
+
Map<HostPort, ConnectionInfo> _connections = new HashMap();
KafkaConfig _config;
- IBrokerReader _reader;
-
+ IBrokerReader _reader;
+
public DynamicPartitionConnections(KafkaConfig config, IBrokerReader brokerReader) {
_config = config;
- _reader = brokerReader;
+ _reader = brokerReader;
}
-
+
public SimpleConsumer register(Partition partition) {
- HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
- return register(hostPort, partition.partition);
+ HostPort hostPort = _reader.getCurrentBrokers().getHostFor(partition.partition);
+ return register(hostPort, partition.partition);
}
-
+
public SimpleConsumer register(HostPort host, int partition) {
- if(!_connections.containsKey(host)) {
+ if (!_connections.containsKey(host)) {
_connections.put(host, new ConnectionInfo(new SimpleConsumer(host.host, host.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId)));
}
ConnectionInfo info = _connections.get(host);
@@ -49,14 +49,16 @@ public class DynamicPartitionConnections {
public SimpleConsumer getConnection(Partition partition) {
ConnectionInfo info = _connections.get(partition.host);
- if(info != null) return info.consumer;
+ if (info != null) {
+ return info.consumer;
+ }
return null;
}
-
+
public void unregister(HostPort port, int partition) {
ConnectionInfo info = _connections.get(port);
info.partitions.remove(partition);
- if(info.partitions.isEmpty()) {
+ if (info.partitions.isEmpty()) {
info.consumer.close();
_connections.remove(port);
}
@@ -65,9 +67,9 @@ public class DynamicPartitionConnections {
public void unregister(Partition partition) {
unregister(partition.host, partition.partition);
}
-
+
public void clear() {
- for(ConnectionInfo info: _connections.values()) {
+ for (ConnectionInfo info : _connections.values()) {
info.consumer.close();
}
_connections.clear();
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/HostPort.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/HostPort.java b/src/jvm/storm/kafka/HostPort.java
index afb5da5..5369858 100644
--- a/src/jvm/storm/kafka/HostPort.java
+++ b/src/jvm/storm/kafka/HostPort.java
@@ -1,18 +1,16 @@
package storm.kafka;
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
public class HostPort implements Serializable, Comparable<HostPort> {
public String host;
public int port;
-
+
public HostPort(String host, int port) {
this.host = host;
this.port = port;
}
-
+
public HostPort(String host) {
this(host, 9092);
}
@@ -33,26 +31,26 @@ public class HostPort implements Serializable, Comparable<HostPort> {
return host + ":" + port;
}
- public static HostPort fromString(String host) {
- HostPort hp;
- String[] spec = host.split(":");
- if (spec.length == 1) {
- hp = new HostPort(spec[0]);
- } else if (spec.length == 2) {
- hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
- } else {
- throw new IllegalArgumentException("Invalid host specification: " + host);
- }
- return hp;
- }
-
-
- @Override
- public int compareTo(HostPort o) {
- if ( this.host.equals(o.host)) {
- return this.port - o.port;
- } else {
- return this.host.compareTo(o.host);
- }
- }
+ public static HostPort fromString(String host) {
+ HostPort hp;
+ String[] spec = host.split(":");
+ if (spec.length == 1) {
+ hp = new HostPort(spec[0]);
+ } else if (spec.length == 2) {
+ hp = new HostPort(spec[0], Integer.parseInt(spec[1]));
+ } else {
+ throw new IllegalArgumentException("Invalid host specification: " + host);
+ }
+ return hp;
+ }
+
+
+ @Override
+ public int compareTo(HostPort o) {
+ if (this.host.equals(o.host)) {
+ return this.port - o.port;
+ } else {
+ return this.host.compareTo(o.host);
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/KafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaConfig.java b/src/jvm/storm/kafka/KafkaConfig.java
index 457eeb4..e241978 100644
--- a/src/jvm/storm/kafka/KafkaConfig.java
+++ b/src/jvm/storm/kafka/KafkaConfig.java
@@ -9,11 +9,11 @@ public class KafkaConfig implements Serializable {
public final BrokerHosts hosts;
public final String topic;
- public final String clientId;
+ public final String clientId;
- public int fetchSizeBytes = 1024*1024;
+ public int fetchSizeBytes = 1024 * 1024;
public int socketTimeoutMs = 10000;
- public int bufferSizeBytes = 1024*1024;
+ public int bufferSizeBytes = 1024 * 1024;
public MultiScheme scheme = new RawMultiScheme();
public boolean forceFromStart = false;
public long startOffsetTime = kafka.api.OffsetRequest.EarliestTime();
@@ -22,11 +22,11 @@ public class KafkaConfig implements Serializable {
this(hosts, topic, kafka.api.OffsetRequest.DefaultClientId());
}
- public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
- this.hosts = hosts;
- this.topic = topic;
- this.clientId = clientId;
- }
+ public KafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+ this.hosts = hosts;
+ this.topic = topic;
+ this.clientId = clientId;
+ }
public void forceStartOffsetTime(long millis) {
startOffsetTime = millis;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/KafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaSpout.java b/src/jvm/storm/kafka/KafkaSpout.java
index 781e6ce..cf407ad 100644
--- a/src/jvm/storm/kafka/KafkaSpout.java
+++ b/src/jvm/storm/kafka/KafkaSpout.java
@@ -54,21 +54,25 @@ public class KafkaSpout extends BaseRichSpout {
public void open(Map conf, final TopologyContext context, final SpoutOutputCollector collector) {
_collector = collector;
- Map stateConf = new HashMap(conf);
+ Map stateConf = new HashMap(conf);
List<String> zkServers = _spoutConfig.zkServers;
- if(zkServers==null) zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ if (zkServers == null) {
+ zkServers = (List<String>) conf.get(Config.STORM_ZOOKEEPER_SERVERS);
+ }
Integer zkPort = _spoutConfig.zkPort;
- if(zkPort==null) zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ if (zkPort == null) {
+ zkPort = ((Number) conf.get(Config.STORM_ZOOKEEPER_PORT)).intValue();
+ }
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS, zkServers);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_PORT, zkPort);
stateConf.put(Config.TRANSACTIONAL_ZOOKEEPER_ROOT, _spoutConfig.zkRoot);
- _state = new ZkState(stateConf);
+ _state = new ZkState(stateConf);
_connections = new DynamicPartitionConnections(_spoutConfig, KafkaUtils.makeBrokerReader(conf, _spoutConfig));
// using TransactionalState like this is a hack
int totalTasks = context.getComponentTasks(context.getThisComponentId()).size();
- if(_spoutConfig.hosts instanceof StaticHosts) {
+ if (_spoutConfig.hosts instanceof StaticHosts) {
_coordinator = new StaticCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
} else {
_coordinator = new ZkCoordinator(_connections, conf, _spoutConfig, _state, context.getThisTaskIndex(), totalTasks, _uuid);
@@ -76,13 +80,16 @@ public class KafkaSpout extends BaseRichSpout {
context.registerMetric("kafkaOffset", new IMetric() {
KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_spoutConfig.topic, _connections);
+
@Override
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Set<Partition> latestPartitions = new HashSet();
- for(PartitionManager pm : pms) { latestPartitions.add(pm.getPartition()); }
+ for (PartitionManager pm : pms) {
+ latestPartitions.add(pm.getPartition());
+ }
_kafkaOffsetMetric.refreshPartitions(latestPartitions);
- for(PartitionManager pm : pms) {
+ for (PartitionManager pm : pms) {
_kafkaOffsetMetric.setLatestEmittedOffset(pm.getPartition(), pm.lastCompletedOffset());
}
return _kafkaOffsetMetric.getValueAndReset();
@@ -94,7 +101,7 @@ public class KafkaSpout extends BaseRichSpout {
public Object getValueAndReset() {
List<PartitionManager> pms = _coordinator.getMyManagedPartitions();
Map concatMetricsDataMaps = new HashMap();
- for(PartitionManager pm : pms) {
+ for (PartitionManager pm : pms) {
concatMetricsDataMaps.putAll(pm.getMetricsDataMap());
}
return concatMetricsDataMaps;
@@ -104,27 +111,27 @@ public class KafkaSpout extends BaseRichSpout {
@Override
public void close() {
- _state.close();
+ _state.close();
}
@Override
public void nextTuple() {
List<PartitionManager> managers = _coordinator.getMyManagedPartitions();
- for(int i=0; i<managers.size(); i++) {
-
+ for (int i = 0; i < managers.size(); i++) {
+
// in case the number of managers decreased
_currPartitionIndex = _currPartitionIndex % managers.size();
EmitState state = managers.get(_currPartitionIndex).next(_collector);
- if(state!=EmitState.EMITTED_MORE_LEFT) {
+ if (state != EmitState.EMITTED_MORE_LEFT) {
_currPartitionIndex = (_currPartitionIndex + 1) % managers.size();
}
- if(state!=EmitState.NO_EMITTED) {
+ if (state != EmitState.NO_EMITTED) {
break;
}
}
long now = System.currentTimeMillis();
- if((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
+ if ((now - _lastUpdateMs) > _spoutConfig.stateUpdateIntervalMs) {
commit();
}
}
@@ -133,18 +140,18 @@ public class KafkaSpout extends BaseRichSpout {
public void ack(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
- if(m!=null) {
+ if (m != null) {
m.ack(id.offset);
- }
+ }
}
@Override
public void fail(Object msgId) {
KafkaMessageId id = (KafkaMessageId) msgId;
PartitionManager m = _coordinator.getManager(id.partition);
- if(m!=null) {
+ if (m != null) {
m.fail(id.offset);
- }
+ }
}
@Override
@@ -159,7 +166,7 @@ public class KafkaSpout extends BaseRichSpout {
private void commit() {
_lastUpdateMs = System.currentTimeMillis();
- for(PartitionManager manager: _coordinator.getMyManagedPartitions()) {
+ for (PartitionManager manager : _coordinator.getMyManagedPartitions()) {
manager.commit();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index 6de0f6a..87ab7b8 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -6,7 +6,7 @@ import storm.trident.spout.ISpoutPartition;
public class Partition implements ISpoutPartition {
- public final HostPort host;
+ public final HostPort host;
public final int partition;
public Partition(HostPort host, int partition) {
@@ -14,30 +14,30 @@ public class Partition implements ISpoutPartition {
this.partition = partition;
}
- @Override
- public int hashCode() {
- return Objects.hashCode(host, partition);
- }
-
- @Override
- public boolean equals(Object obj) {
- if (this == obj) {
- return true;
- }
- if (obj == null || getClass() != obj.getClass()) {
- return false;
- }
- final Partition other = (Partition) obj;
- return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
- }
-
- @Override
- public String toString() {
- return "Partition{" +
- "host=" + host +
- ", partition=" + partition +
- '}';
- }
+ @Override
+ public int hashCode() {
+ return Objects.hashCode(host, partition);
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (this == obj) {
+ return true;
+ }
+ if (obj == null || getClass() != obj.getClass()) {
+ return false;
+ }
+ final Partition other = (Partition) obj;
+ return Objects.equal(this.host, other.host) && Objects.equal(this.partition, other.partition);
+ }
+
+ @Override
+ public String toString() {
+ return "Partition{" +
+ "host=" + host +
+ ", partition=" + partition +
+ '}';
+ }
@Override
public String getId() {
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/PartitionCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionCoordinator.java b/src/jvm/storm/kafka/PartitionCoordinator.java
index 2ee2009..d28248d 100644
--- a/src/jvm/storm/kafka/PartitionCoordinator.java
+++ b/src/jvm/storm/kafka/PartitionCoordinator.java
@@ -4,5 +4,6 @@ import java.util.List;
public interface PartitionCoordinator {
List<PartitionManager> getMyManagedPartitions();
+
PartitionManager getManager(Partition partition);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 6596f96..623bc10 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -58,31 +58,30 @@ public class PartitionManager {
_spoutConfig = spoutConfig;
_topologyInstanceId = topologyInstanceId;
_consumer = connections.register(id.host, id.partition);
- _state = state;
+ _state = state;
_stormConf = stormConf;
String jsonTopologyId = null;
Long jsonOffset = null;
try {
Map<Object, Object> json = _state.readJSON(committedPath());
- if(json != null) {
- jsonTopologyId = (String)((Map<Object,Object>)json.get("topology")).get("id");
- jsonOffset = (Long)json.get("offset");
+ if (json != null) {
+ jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
+ jsonOffset = (Long) json.get("offset");
}
- }
- catch(Throwable e) {
+ } catch (Throwable e) {
LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
}
- if(!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
+ if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
_committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, spoutConfig.startOffsetTime);
- LOG.info("Using startOffsetTime to choose last commit offset.");
- } else if(jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
- _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
- LOG.info("Setting last commit offset to HEAD.");
+ LOG.info("Using startOffsetTime to choose last commit offset.");
+ } else if (jsonTopologyId == null || jsonOffset == null) { // failed to parse JSON?
+ _committedTo = KafkaUtils.getOffset(_consumer, spoutConfig.topic, id.partition, kafka.api.OffsetRequest.LatestTime());
+ LOG.info("Setting last commit offset to HEAD.");
} else {
_committedTo = jsonOffset;
- LOG.info("Read last commit offset from zookeeper: " + _committedTo);
+ LOG.info("Read last commit offset from zookeeper: " + _committedTo);
}
LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
@@ -105,22 +104,25 @@ public class PartitionManager {
//returns false if it's reached the end of current batch
public EmitState next(SpoutOutputCollector collector) {
- if(_waitingToEmit.isEmpty()) fill();
- while(true) {
+ if (_waitingToEmit.isEmpty()) {
+ fill();
+ }
+ while (true) {
MessageAndRealOffset toEmit = _waitingToEmit.pollFirst();
- if(toEmit==null) {
+ if (toEmit == null) {
return EmitState.NO_EMITTED;
}
Iterable<List<Object>> tups = _spoutConfig.scheme.deserialize(Utils.toByteArray(toEmit.msg.payload()));
- if(tups!=null) {
- for(List<Object> tup: tups)
+ if (tups != null) {
+ for (List<Object> tup : tups) {
collector.emit(tup, new KafkaMessageId(_partition, toEmit.offset));
+ }
break;
} else {
ack(toEmit.offset);
}
}
- if(!_waitingToEmit.isEmpty()) {
+ if (!_waitingToEmit.isEmpty()) {
return EmitState.EMITTED_MORE_LEFT;
} else {
return EmitState.EMITTED_END;
@@ -132,11 +134,11 @@ public class PartitionManager {
long start = System.nanoTime();
ByteBufferMessageSet msgs = _consumer.fetch(
new FetchRequestBuilder().addFetch(
- _spoutConfig.topic,
- _partition.partition,
- _emittedToOffset,
- _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
- _partition.partition);
+ _spoutConfig.topic,
+ _partition.partition,
+ _emittedToOffset,
+ _spoutConfig.fetchSizeBytes).build()).messageSet(_spoutConfig.topic,
+ _partition.partition);
long end = System.nanoTime();
long millis = (end - start) / 1000000;
_fetchAPILatencyMax.update(millis);
@@ -145,26 +147,26 @@ public class PartitionManager {
int numMessages = countMessages(msgs);
_fetchAPIMessageCount.incrBy(numMessages);
- if(numMessages>0) {
- LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
+ if (numMessages > 0) {
+ LOG.info("Fetched " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition);
}
- for(MessageAndOffset msg: msgs) {
+ for (MessageAndOffset msg : msgs) {
_pending.add(_emittedToOffset);
_waitingToEmit.add(new MessageAndRealOffset(msg.message(), _emittedToOffset));
_emittedToOffset = msg.nextOffset();
}
- if(numMessages>0) {
- LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
+ if (numMessages > 0) {
+ LOG.info("Added " + numMessages + " messages from Kafka: " + _consumer.host() + ":" + _partition.partition + " to internal buffers");
}
}
- private int countMessages(ByteBufferMessageSet messageSet) {
- int counter = 0;
- for (MessageAndOffset messageAndOffset : messageSet) {
- counter = counter + 1;
- }
- return counter;
- }
+ private int countMessages(ByteBufferMessageSet messageSet) {
+ int counter = 0;
+ for (MessageAndOffset messageAndOffset : messageSet) {
+ counter = counter + 1;
+ }
+ return counter;
+ }
public void ack(Long offset) {
_pending.remove(offset);
@@ -173,7 +175,7 @@ public class PartitionManager {
public void fail(Long offset) {
//TODO: should it use in-memory ack set to skip anything that's been acked but not committed???
// things might get crazy with lots of timeouts
- if(_emittedToOffset > offset) {
+ if (_emittedToOffset > offset) {
_emittedToOffset = offset;
_pending.tailSet(offset).clear();
}
@@ -182,23 +184,23 @@ public class PartitionManager {
public void commit() {
LOG.info("Committing offset for " + _partition);
long committedTo;
- if(_pending.isEmpty()) {
+ if (_pending.isEmpty()) {
committedTo = _emittedToOffset;
} else {
committedTo = _pending.first();
}
- if(committedTo!=_committedTo) {
+ if (committedTo != _committedTo) {
LOG.info("Writing committed offset to ZK: " + committedTo);
- Map<Object, Object> data = (Map<Object,Object>)ImmutableMap.builder()
- .put("topology", ImmutableMap.of("id", _topologyInstanceId,
- "name", _stormConf.get(Config.TOPOLOGY_NAME)))
- .put("offset", committedTo)
- .put("partition", _partition.partition)
- .put("broker", ImmutableMap.of("host", _partition.host.host,
- "port", _partition.host.port))
- .put("topic", _spoutConfig.topic).build();
- _state.writeJSON(committedPath(), data);
+ Map<Object, Object> data = (Map<Object, Object>) ImmutableMap.builder()
+ .put("topology", ImmutableMap.of("id", _topologyInstanceId,
+ "name", _stormConf.get(Config.TOPOLOGY_NAME)))
+ .put("offset", committedTo)
+ .put("partition", _partition.partition)
+ .put("broker", ImmutableMap.of("host", _partition.host.host,
+ "port", _partition.host.port))
+ .put("topic", _spoutConfig.topic).build();
+ _state.writeJSON(committedPath(), data);
LOG.info("Wrote committed offset to ZK: " + committedTo);
_committedTo = committedTo;
@@ -212,7 +214,7 @@ public class PartitionManager {
public long queryPartitionOffsetLatestTime() {
return KafkaUtils.getOffset(_consumer, _spoutConfig.topic, _partition.partition,
- OffsetRequest.LatestTime());
+ OffsetRequest.LatestTime());
}
public long lastCommittedOffset() {
@@ -220,7 +222,7 @@ public class PartitionManager {
}
public long lastCompletedOffset() {
- if(_pending.isEmpty()) {
+ if (_pending.isEmpty()) {
return _emittedToOffset;
} else {
return _pending.first();
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticCoordinator.java b/src/jvm/storm/kafka/StaticCoordinator.java
index 6f97c8b..7415522 100644
--- a/src/jvm/storm/kafka/StaticCoordinator.java
+++ b/src/jvm/storm/kafka/StaticCoordinator.java
@@ -13,22 +13,22 @@ public class StaticCoordinator implements PartitionCoordinator {
public StaticCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig config, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
StaticHosts hosts = (StaticHosts) config.hosts;
List<Partition> partitions = hosts.getPartitionInformation().getOrderedPartitions();
- for(int i=taskIndex; i<partitions.size(); i+=totalTasks) {
+ for (int i = taskIndex; i < partitions.size(); i += totalTasks) {
Partition myPartition = partitions.get(i);
_managers.put(myPartition, new PartitionManager(connections, topologyInstanceId, state, stormConf, config, myPartition));
-
+
}
-
+
_allManagers = new ArrayList(_managers.values());
}
-
+
@Override
public List<PartitionManager> getMyManagedPartitions() {
return _allManagers;
}
-
+
public PartitionManager getManager(Partition partition) {
return _managers.get(partition);
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticHosts.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticHosts.java b/src/jvm/storm/kafka/StaticHosts.java
index 6ed828d..9ed7193 100644
--- a/src/jvm/storm/kafka/StaticHosts.java
+++ b/src/jvm/storm/kafka/StaticHosts.java
@@ -2,9 +2,6 @@ package storm.kafka;
import storm.kafka.trident.GlobalPartitionInformation;
-import java.util.ArrayList;
-import java.util.List;
-
/**
* Date: 11/05/2013
* Time: 14:43
@@ -12,13 +9,13 @@ import java.util.List;
public class StaticHosts implements BrokerHosts {
- private GlobalPartitionInformation partitionInformation;
+ private GlobalPartitionInformation partitionInformation;
- public StaticHosts(GlobalPartitionInformation partitionInformation) {
- this.partitionInformation = partitionInformation;
- }
+ public StaticHosts(GlobalPartitionInformation partitionInformation) {
+ this.partitionInformation = partitionInformation;
+ }
- public GlobalPartitionInformation getPartitionInformation() {
- return partitionInformation;
- }
+ public GlobalPartitionInformation getPartitionInformation() {
+ return partitionInformation;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/StaticPartitionConnections.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/StaticPartitionConnections.java b/src/jvm/storm/kafka/StaticPartitionConnections.java
index 2d40c8b..4294362 100644
--- a/src/jvm/storm/kafka/StaticPartitionConnections.java
+++ b/src/jvm/storm/kafka/StaticPartitionConnections.java
@@ -9,17 +9,17 @@ public class StaticPartitionConnections {
Map<Integer, SimpleConsumer> _kafka = new HashMap<Integer, SimpleConsumer>();
KafkaConfig _config;
StaticHosts hosts;
-
+
public StaticPartitionConnections(KafkaConfig conf) {
_config = conf;
- if(!(conf.hosts instanceof StaticHosts)) {
+ if (!(conf.hosts instanceof StaticHosts)) {
throw new RuntimeException("Must configure with static hosts");
}
this.hosts = (StaticHosts) conf.hosts;
}
public SimpleConsumer getConsumer(int partition) {
- if(!_kafka.containsKey(partition)) {
+ if (!_kafka.containsKey(partition)) {
HostPort hp = hosts.getPartitionInformation().getHostFor(partition);
_kafka.put(partition, new SimpleConsumer(hp.host, hp.port, _config.socketTimeoutMs, _config.bufferSizeBytes, _config.clientId));
@@ -28,7 +28,7 @@ public class StaticPartitionConnections {
}
public void close() {
- for(SimpleConsumer consumer: _kafka.values()) {
+ for (SimpleConsumer consumer : _kafka.values()) {
consumer.close();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkCoordinator.java b/src/jvm/storm/kafka/ZkCoordinator.java
index d457bdd..98e51a3 100644
--- a/src/jvm/storm/kafka/ZkCoordinator.java
+++ b/src/jvm/storm/kafka/ZkCoordinator.java
@@ -9,7 +9,7 @@ import java.util.*;
public class ZkCoordinator implements PartitionCoordinator {
public static final Logger LOG = LoggerFactory.getLogger(ZkCoordinator.class);
-
+
SpoutConfig _spoutConfig;
int _taskIndex;
int _totalTasks;
@@ -23,7 +23,7 @@ public class ZkCoordinator implements PartitionCoordinator {
ZkState _state;
Map _stormConf;
IMetricsContext _metricsContext;
-
+
public ZkCoordinator(DynamicPartitionConnections connections, Map stormConf, SpoutConfig spoutConfig, ZkState state, int taskIndex, int totalTasks, String topologyInstanceId) {
_spoutConfig = spoutConfig;
_connections = connections;
@@ -31,55 +31,55 @@ public class ZkCoordinator implements PartitionCoordinator {
_totalTasks = totalTasks;
_topologyInstanceId = topologyInstanceId;
_stormConf = stormConf;
- _state = state;
+ _state = state;
ZkHosts brokerConf = (ZkHosts) spoutConfig.hosts;
_refreshFreqMs = brokerConf.refreshFreqSecs * 1000;
_reader = new DynamicBrokersReader(stormConf, brokerConf.brokerZkStr, brokerConf.brokerZkPath, spoutConfig.topic);
-
+
}
-
+
@Override
public List<PartitionManager> getMyManagedPartitions() {
- if(_lastRefreshTime==null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
+ if (_lastRefreshTime == null || (System.currentTimeMillis() - _lastRefreshTime) > _refreshFreqMs) {
refresh();
_lastRefreshTime = System.currentTimeMillis();
}
return _cachedList;
}
-
+
void refresh() {
try {
LOG.info("Refreshing partition manager connections");
- GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
+ GlobalPartitionInformation brokerInfo = _reader.getBrokerInfo();
Set<Partition> mine = new HashSet();
- for (Partition partitionId: brokerInfo){
- if(myOwnership(partitionId)) {
- mine.add(partitionId);
- }
- }
+ for (Partition partitionId : brokerInfo) {
+ if (myOwnership(partitionId)) {
+ mine.add(partitionId);
+ }
+ }
Set<Partition> curr = _managers.keySet();
Set<Partition> newPartitions = new HashSet<Partition>(mine);
newPartitions.removeAll(curr);
-
+
Set<Partition> deletedPartitions = new HashSet<Partition>(curr);
deletedPartitions.removeAll(mine);
-
+
LOG.info("Deleted partition managers: " + deletedPartitions.toString());
-
- for(Partition id: deletedPartitions) {
+
+ for (Partition id : deletedPartitions) {
PartitionManager man = _managers.remove(id);
man.close();
}
LOG.info("New partition managers: " + newPartitions.toString());
-
- for(Partition id: newPartitions) {
+
+ for (Partition id : newPartitions) {
PartitionManager man = new PartitionManager(_connections, _topologyInstanceId, _state, _stormConf, _spoutConfig, id);
_managers.put(id, man);
}
-
- } catch(Exception e) {
+
+ } catch (Exception e) {
throw new RuntimeException(e);
}
_cachedList = new ArrayList<PartitionManager>(_managers.values());
@@ -90,9 +90,9 @@ public class ZkCoordinator implements PartitionCoordinator {
public PartitionManager getManager(Partition partition) {
return _managers.get(partition);
}
-
+
private boolean myOwnership(Partition id) {
- int val = Math.abs(id.host.hashCode() + 23 * id.partition);
+ int val = Math.abs(id.host.hashCode() + 23 * id.partition);
return val % _totalTasks == _taskIndex;
- }
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkHosts.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkHosts.java b/src/jvm/storm/kafka/ZkHosts.java
index dfe3b4c..f2e0fc2 100644
--- a/src/jvm/storm/kafka/ZkHosts.java
+++ b/src/jvm/storm/kafka/ZkHosts.java
@@ -5,18 +5,18 @@ package storm.kafka;
* Time: 14:38
*/
public class ZkHosts implements BrokerHosts {
- private static final String DEFAULT_ZK_PATH = "/brokers";
+ private static final String DEFAULT_ZK_PATH = "/brokers";
- public String brokerZkStr = null;
- public String brokerZkPath = null; // e.g., /kafka/brokers
- public int refreshFreqSecs = 60;
+ public String brokerZkStr = null;
+ public String brokerZkPath = null; // e.g., /kafka/brokers
+ public int refreshFreqSecs = 60;
- public ZkHosts(String brokerZkStr, String brokerZkPath) {
- this.brokerZkStr = brokerZkStr;
- this.brokerZkPath = brokerZkPath;
- }
+ public ZkHosts(String brokerZkStr, String brokerZkPath) {
+ this.brokerZkStr = brokerZkStr;
+ this.brokerZkPath = brokerZkPath;
+ }
- public ZkHosts(String brokerZkStr) {
- this(brokerZkStr, DEFAULT_ZK_PATH);
- }
+ public ZkHosts(String brokerZkStr) {
+ this(brokerZkStr, DEFAULT_ZK_PATH);
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/ZkState.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/ZkState.java b/src/jvm/storm/kafka/ZkState.java
index 17ebb25..d5416af 100644
--- a/src/jvm/storm/kafka/ZkState.java
+++ b/src/jvm/storm/kafka/ZkState.java
@@ -20,78 +20,80 @@ public class ZkState {
CuratorFramework _curator;
private CuratorFramework newCurator(Map stateConf) throws Exception {
- Integer port = (Integer)stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
- String serverPorts = "";
- for(String server: (List<String>)stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
+ Integer port = (Integer) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_PORT);
+ String serverPorts = "";
+ for (String server : (List<String>) stateConf.get(Config.TRANSACTIONAL_ZOOKEEPER_SERVERS)) {
serverPorts = serverPorts + server + ":" + port + ",";
}
- return CuratorFrameworkFactory.newClient(serverPorts,
- Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- 15000,
- new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ return CuratorFrameworkFactory.newClient(serverPorts,
+ Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+ 15000,
+ new RetryNTimes(Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+ Utils.getInt(stateConf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
}
public CuratorFramework getCurator() {
- assert _curator != null;
+ assert _curator != null;
return _curator;
}
public ZkState(Map stateConf) {
- stateConf = new HashMap(stateConf);
+ stateConf = new HashMap(stateConf);
- try {
- _curator = newCurator(stateConf);
- _curator.start();
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
+ try {
+ _curator = newCurator(stateConf);
+ _curator.start();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
- public void writeJSON(String path, Map<Object,Object> data) {
- LOG.info("Writing " + path + " the data " + data.toString());
+ public void writeJSON(String path, Map<Object, Object> data) {
+ LOG.info("Writing " + path + " the data " + data.toString());
writeBytes(path, JSONValue.toJSONString(data).getBytes(Charset.forName("UTF-8")));
}
public void writeBytes(String path, byte[] bytes) {
try {
- if(_curator.checkExists().forPath(path)==null) {
+ if (_curator.checkExists().forPath(path) == null) {
_curator.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.forPath(path, bytes);
} else {
- _curator.setData().forPath(path, bytes);
- }
- } catch(Exception e) {
+ _curator.setData().forPath(path, bytes);
+ }
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
- public Map<Object,Object> readJSON(String path) {
- try {
- byte[] b = readBytes(path);
- if(b==null) return null;
- return (Map<Object,Object>)JSONValue.parse(new String(b, "UTF-8"));
- } catch(Exception e) {
- throw new RuntimeException(e);
- }
+ public Map<Object, Object> readJSON(String path) {
+ try {
+ byte[] b = readBytes(path);
+ if (b == null) {
+ return null;
+ }
+ return (Map<Object, Object>) JSONValue.parse(new String(b, "UTF-8"));
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
}
public byte[] readBytes(String path) {
try {
- if(_curator.checkExists().forPath(path)!=null) {
- return _curator.getData().forPath(path);
+ if (_curator.checkExists().forPath(path) != null) {
+ return _curator.getData().forPath(path);
} else {
return null;
}
- } catch(Exception e) {
+ } catch (Exception e) {
throw new RuntimeException(e);
}
}
public void close() {
- _curator.close();
- _curator = null;
+ _curator.close();
+ _curator = null;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/Coordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/Coordinator.java b/src/jvm/storm/kafka/trident/Coordinator.java
index b7ddd3f..d97feed 100644
--- a/src/jvm/storm/kafka/trident/Coordinator.java
+++ b/src/jvm/storm/kafka/trident/Coordinator.java
@@ -11,26 +11,26 @@ import java.util.Map;
*/
class Coordinator implements IPartitionedTridentSpout.Coordinator<GlobalPartitionInformation>, IOpaquePartitionedTridentSpout.Coordinator<GlobalPartitionInformation> {
- private IBrokerReader reader;
- private TridentKafkaConfig config;
-
- public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
- config = tridentKafkaConfig;
- reader = KafkaUtils.makeBrokerReader(conf, config);
- }
-
- @Override
- public void close() {
- config.coordinator.close();
- }
-
- @Override
- public boolean isReady(long txid) {
- return config.coordinator.isReady(txid);
- }
-
- @Override
- public GlobalPartitionInformation getPartitionsForBatch() {
- return reader.getCurrentBrokers();
- }
+ private IBrokerReader reader;
+ private TridentKafkaConfig config;
+
+ public Coordinator(Map conf, TridentKafkaConfig tridentKafkaConfig) {
+ config = tridentKafkaConfig;
+ reader = KafkaUtils.makeBrokerReader(conf, config);
+ }
+
+ @Override
+ public void close() {
+ config.coordinator.close();
+ }
+
+ @Override
+ public boolean isReady(long txid) {
+ return config.coordinator.isReady(txid);
+ }
+
+ @Override
+ public GlobalPartitionInformation getPartitionsForBatch() {
+ return reader.getCurrentBrokers();
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/DefaultCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/DefaultCoordinator.java b/src/jvm/storm/kafka/trident/DefaultCoordinator.java
index 3a47706..89cd503 100644
--- a/src/jvm/storm/kafka/trident/DefaultCoordinator.java
+++ b/src/jvm/storm/kafka/trident/DefaultCoordinator.java
@@ -10,5 +10,5 @@ public class DefaultCoordinator implements IBatchCoordinator {
@Override
public void close() {
}
-
+
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
index 139a0d7..6b0fdec 100644
--- a/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
+++ b/src/jvm/storm/kafka/trident/GlobalPartitionInformation.java
@@ -1,7 +1,7 @@
package storm.kafka.trident;
-import storm.kafka.Partition;
import storm.kafka.HostPort;
+import storm.kafka.Partition;
import java.io.Serializable;
import java.util.*;
@@ -12,55 +12,55 @@ import java.util.*;
*/
public class GlobalPartitionInformation implements Iterable<Partition>, Serializable {
- private Map<Integer, HostPort> partitionMap;
+ private Map<Integer, HostPort> partitionMap;
- public GlobalPartitionInformation() {
- partitionMap = new TreeMap<Integer, HostPort>();
- }
+ public GlobalPartitionInformation() {
+ partitionMap = new TreeMap<Integer, HostPort>();
+ }
- public void addPartition(int partitionId, HostPort broker) {
- partitionMap.put(partitionId, broker);
- }
+ public void addPartition(int partitionId, HostPort broker) {
+ partitionMap.put(partitionId, broker);
+ }
- @Override
- public String toString() {
- return "GlobalPartitionInformation{" +
- "partitionMap=" + partitionMap +
- '}';
- }
+ @Override
+ public String toString() {
+ return "GlobalPartitionInformation{" +
+ "partitionMap=" + partitionMap +
+ '}';
+ }
- public HostPort getHostFor(Integer partitionId) {
- return partitionMap.get(partitionId);
- }
+ public HostPort getHostFor(Integer partitionId) {
+ return partitionMap.get(partitionId);
+ }
- public List<Partition> getOrderedPartitions(){
- List<Partition> partitions = new LinkedList<Partition>();
- for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
- partitions.add(new Partition(partition.getValue(), partition.getKey()));
- }
- return partitions;
- }
+ public List<Partition> getOrderedPartitions() {
+ List<Partition> partitions = new LinkedList<Partition>();
+ for (Map.Entry<Integer, HostPort> partition : partitionMap.entrySet()) {
+ partitions.add(new Partition(partition.getValue(), partition.getKey()));
+ }
+ return partitions;
+ }
- @Override
- public Iterator<Partition> iterator() {
- final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
+ @Override
+ public Iterator<Partition> iterator() {
+ final Iterator<Map.Entry<Integer, HostPort>> iterator = partitionMap.entrySet().iterator();
- return new Iterator<Partition>() {
- @Override
- public boolean hasNext() {
- return iterator.hasNext();
- }
+ return new Iterator<Partition>() {
+ @Override
+ public boolean hasNext() {
+ return iterator.hasNext();
+ }
- @Override
- public Partition next() {
- Map.Entry<Integer, HostPort> next = iterator.next();
- return new Partition(next.getValue(), next.getKey());
- }
+ @Override
+ public Partition next() {
+ Map.Entry<Integer, HostPort> next = iterator.next();
+ return new Partition(next.getValue(), next.getKey());
+ }
- @Override
- public void remove() {
- iterator.remove();
- }
- };
- }
+ @Override
+ public void remove() {
+ iterator.remove();
+ }
+ };
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/IBatchCoordinator.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/IBatchCoordinator.java b/src/jvm/storm/kafka/trident/IBatchCoordinator.java
index 9199a8d..1b8a8ce 100644
--- a/src/jvm/storm/kafka/trident/IBatchCoordinator.java
+++ b/src/jvm/storm/kafka/trident/IBatchCoordinator.java
@@ -4,5 +4,6 @@ import java.io.Serializable;
public interface IBatchCoordinator extends Serializable {
boolean isReady(long txid);
+
void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/IBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/IBrokerReader.java b/src/jvm/storm/kafka/trident/IBrokerReader.java
index 4e2421b..73c9738 100644
--- a/src/jvm/storm/kafka/trident/IBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/IBrokerReader.java
@@ -3,5 +3,6 @@ package storm.kafka.trident;
public interface IBrokerReader {
GlobalPartitionInformation getCurrentBrokers();
+
void close();
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/KafkaUtils.java b/src/jvm/storm/kafka/trident/KafkaUtils.java
index efe4fef..18dd851 100644
--- a/src/jvm/storm/kafka/trident/KafkaUtils.java
+++ b/src/jvm/storm/kafka/trident/KafkaUtils.java
@@ -16,33 +16,33 @@ import java.util.Set;
public class KafkaUtils {
public static final Logger LOG = LoggerFactory.getLogger(KafkaUtils.class);
- private static final int NO_OFFSET = -5;
+ private static final int NO_OFFSET = -5;
- public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
- if(conf.hosts instanceof StaticHosts) {
- return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
- } else {
- return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
- }
- }
+ public static IBrokerReader makeBrokerReader(Map stormConf, KafkaConfig conf) {
+ if (conf.hosts instanceof StaticHosts) {
+ return new StaticBrokerReader(((StaticHosts) conf.hosts).getPartitionInformation());
+ } else {
+ return new ZkBrokerReader(stormConf, conf.topic, (ZkHosts) conf.hosts);
+ }
+ }
- public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
- TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
- Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
- requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
- OffsetRequest request = new OffsetRequest(
- requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
+ public static long getOffset(SimpleConsumer consumer, String topic, int partition, long startOffsetTime) {
+ TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
+ Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfo = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
+ requestInfo.put(topicAndPartition, new PartitionOffsetRequestInfo(startOffsetTime, 1));
+ OffsetRequest request = new OffsetRequest(
+ requestInfo, kafka.api.OffsetRequest.CurrentVersion(), consumer.clientId());
- long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
- if ( offsets.length > 0) {
- return offsets[0];
- } else {
- return NO_OFFSET;
- }
- }
+ long[] offsets = consumer.getOffsetsBefore(request).offsets(topic, partition);
+ if (offsets.length > 0) {
+ return offsets[0];
+ } else {
+ return NO_OFFSET;
+ }
+ }
- public static class KafkaOffsetMetric implements IMetric {
+ public static class KafkaOffsetMetric implements IMetric {
Map<Partition, Long> _partitionToOffset = new HashMap<Partition, Long>();
Set<Partition> _partitions;
String _topic;
@@ -64,16 +64,16 @@ public class KafkaUtils {
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
HashMap ret = new HashMap();
- if(_partitions != null && _partitions.size() == _partitionToOffset.size()) {
- for(Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
+ if (_partitions != null && _partitions.size() == _partitionToOffset.size()) {
+ for (Map.Entry<Partition, Long> e : _partitionToOffset.entrySet()) {
Partition partition = e.getKey();
SimpleConsumer consumer = _connections.getConnection(partition);
- if(consumer == null) {
+ if (consumer == null) {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
- if(latestTimeOffset == 0) {
+ if (latestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
return null;
}
@@ -93,18 +93,22 @@ public class KafkaUtils {
} else {
LOG.info("Metrics Tick: Not enough data to calculate spout lag.");
}
- } catch(Throwable t) {
+ } catch (Throwable t) {
LOG.warn("Metrics Tick: Exception when computing kafkaOffset metric.", t);
}
return null;
}
- public void refreshPartitions(Set<Partition> partitions) {
- _partitions = partitions;
- Iterator<Partition> it = _partitionToOffset.keySet().iterator();
- while(it.hasNext()) {
- if(!partitions.contains(it.next())) it.remove();
- }
- }
- };
+ public void refreshPartitions(Set<Partition> partitions) {
+ _partitions = partitions;
+ Iterator<Partition> it = _partitionToOffset.keySet().iterator();
+ while (it.hasNext()) {
+ if (!partitions.contains(it.next())) {
+ it.remove();
+ }
+ }
+ }
+ }
+
+ ;
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/MaxMetric.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/MaxMetric.java b/src/jvm/storm/kafka/trident/MaxMetric.java
index 087245f..a8f88ba 100644
--- a/src/jvm/storm/kafka/trident/MaxMetric.java
+++ b/src/jvm/storm/kafka/trident/MaxMetric.java
@@ -11,8 +11,12 @@ public class MaxMetric implements ICombiner<Long> {
@Override
public Long combine(Long l1, Long l2) {
- if(l1 == null) return l2;
- if(l2 == null) return l1;
+ if (l1 == null) {
+ return l2;
+ }
+ if (l2 == null) {
+ return l1;
+ }
return Math.max(l1, l2);
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java b/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
index 0f6e6c5..35b7033 100644
--- a/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
+++ b/src/jvm/storm/kafka/trident/OpaqueTridentKafkaSpout.java
@@ -11,19 +11,19 @@ import java.util.UUID;
public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
+
TridentKafkaConfig _config;
String _topologyInstanceId = UUID.randomUUID().toString();
-
+
public OpaqueTridentKafkaSpout(TridentKafkaConfig config) {
_config = config;
}
-
+
@Override
public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> getEmitter(Map conf, TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asOpaqueEmitter();
+ return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asOpaqueEmitter();
}
-
+
@Override
public IOpaquePartitionedTridentSpout.Coordinator getCoordinator(Map conf, TopologyContext tc) {
return new storm.kafka.trident.Coordinator(conf, _config);
@@ -32,8 +32,8 @@ public class OpaqueTridentKafkaSpout implements IOpaquePartitionedTridentSpout<G
@Override
public Fields getOutputFields() {
return _config.scheme.getOutputFields();
- }
-
+ }
+
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/StaticBrokerReader.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/StaticBrokerReader.java b/src/jvm/storm/kafka/trident/StaticBrokerReader.java
index dabbd5e..98a8f53 100644
--- a/src/jvm/storm/kafka/trident/StaticBrokerReader.java
+++ b/src/jvm/storm/kafka/trident/StaticBrokerReader.java
@@ -2,12 +2,12 @@ package storm.kafka.trident;
public class StaticBrokerReader implements IBrokerReader {
- private GlobalPartitionInformation brokers = new GlobalPartitionInformation();
-
+ private GlobalPartitionInformation brokers = new GlobalPartitionInformation();
+
public StaticBrokerReader(GlobalPartitionInformation partitionInformation) {
this.brokers = partitionInformation;
}
-
+
@Override
public GlobalPartitionInformation getCurrentBrokers() {
return brokers;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java b/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
index 173a98f..b32d301 100644
--- a/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
+++ b/src/jvm/storm/kafka/trident/TransactionalTridentKafkaSpout.java
@@ -10,7 +10,7 @@ import java.util.UUID;
public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<GlobalPartitionInformation, Partition, Map> {
-
+
TridentKafkaConfig _config;
String _topologyInstanceId = UUID.randomUUID().toString();
@@ -26,14 +26,14 @@ public class TransactionalTridentKafkaSpout implements IPartitionedTridentSpout<
@Override
public IPartitionedTridentSpout.Emitter getEmitter(Map conf, TopologyContext context) {
- return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asTransactionalEmitter();
+ return new TridentKafkaEmitter(conf, context, _config, _topologyInstanceId).asTransactionalEmitter();
}
@Override
public Fields getOutputFields() {
- return _config.scheme.getOutputFields();
+ return _config.scheme.getOutputFields();
}
-
+
@Override
public Map<String, Object> getComponentConfiguration() {
return null;
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaConfig.java b/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
index 7195500..073afa2 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaConfig.java
@@ -13,8 +13,8 @@ public class TridentKafkaConfig extends KafkaConfig {
super(hosts, topic);
}
- public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
- super(hosts, topic, clientId);
- }
+ public TridentKafkaConfig(BrokerHosts hosts, String topic, String clientId) {
+ super(hosts, topic, clientId);
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/e8f54d63/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
index 282e67a..ab4ec63 100644
--- a/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/src/jvm/storm/kafka/trident/TridentKafkaEmitter.java
@@ -34,237 +34,241 @@ import java.util.Map;
*/
public class TridentKafkaEmitter {
- public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
-
- private DynamicPartitionConnections _connections;
- private String _topologyName;
- private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
- private ReducedMetric _kafkaMeanFetchLatencyMetric;
- private CombinedMetric _kafkaMaxFetchLatencyMetric;
- private TridentKafkaConfig _config;
- private String _topologyInstanceId;
-
- public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
- _config = config;
- _topologyInstanceId = topologyInstanceId;
- _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
- _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
- _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
- context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
- _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
- _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
- }
-
-
- private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
- SimpleConsumer consumer = _connections.register(partition);
- Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
- _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
- return ret;
- }
-
- private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
- try {
- return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
- } catch (FailedFetchException e) {
- LOG.warn("Failed to fetch from partition " + partition);
- if (lastMeta == null) {
- return null;
- } else {
- Map ret = new HashMap();
- ret.put("offset", lastMeta.get("nextOffset"));
- ret.put("nextOffset", lastMeta.get("nextOffset"));
- ret.put("partition", partition.partition);
- ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- ret.put("topic", _config.topic);
- ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- return ret;
- }
- }
- }
-
- private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
- long offset;
- if (lastMeta != null) {
- String lastInstanceId = null;
- Map lastTopoMeta = (Map) lastMeta.get("topology");
- if (lastTopoMeta != null) {
- lastInstanceId = (String) lastTopoMeta.get("id");
- }
- if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
- offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
- } else {
- offset = (Long) lastMeta.get("nextOffset");
- }
- } else {
- long startTime = kafka.api.OffsetRequest.LatestTime();
- if (_config.forceFromStart) startTime = _config.startOffsetTime;
- offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
- }
- ByteBufferMessageSet msgs;
- try {
- msgs = fetchMessages(consumer, partition, offset);
- } catch (Exception e) {
- if (e instanceof ConnectException) {
- throw new FailedFetchException(e);
- } else {
- throw new RuntimeException(e);
- }
- }
- long endoffset = offset;
- for (MessageAndOffset msg : msgs) {
- emit(collector, msg.message());
- endoffset = msg.nextOffset();
- }
- Map newMeta = new HashMap();
- newMeta.put("offset", offset);
- newMeta.put("nextOffset", endoffset);
- newMeta.put("instanceId", _topologyInstanceId);
- newMeta.put("partition", partition.partition);
- newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
- newMeta.put("topic", _config.topic);
- newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
- return newMeta;
- }
-
- private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
- ByteBufferMessageSet msgs;
- long start = System.nanoTime();
- FetchRequestBuilder builder = new FetchRequestBuilder();
- FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
- msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
- long end = System.nanoTime();
- long millis = (end - start) / 1000000;
- _kafkaMeanFetchLatencyMetric.update(millis);
- _kafkaMaxFetchLatencyMetric.update(millis);
- return msgs;
- }
-
- /**
- * re-emit the batch described by the meta data provided
- *
- * @param attempt
- * @param collector
- * @param partition
- * @param meta
- */
- private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
- LOG.info("re-emitting batch, attempt " + attempt);
- String instanceId = (String) meta.get("instanceId");
- if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
- SimpleConsumer consumer = _connections.register(partition);
- long offset = (Long) meta.get("offset");
- long nextOffset = (Long) meta.get("nextOffset");
- ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
- for (MessageAndOffset msg : msgs) {
- if (offset == nextOffset) break;
- if (offset > nextOffset) {
- throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
- }
- emit(collector, msg.message());
- offset = msg.nextOffset();
- }
- }
- }
-
- private void emit(TridentCollector collector, Message msg) {
- Iterable<List<Object>> values =
- _config.scheme.deserialize(Utils.toByteArray(msg.payload()));
- if (values != null) {
- for (List<Object> value : values) {
- collector.emit(value);
- }
- }
- }
-
- private void clear() {
- _connections.clear();
- }
-
- private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
- return partitions.getOrderedPartitions();
- }
-
- private void refresh(List<Partition> list) {
- _connections.clear();
- _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
- }
-
-
- public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> asOpaqueEmitter() {
-
- return new IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction.
- *
- * Return the metadata describing this batch that will be used as lastPartitionMeta
- * for defining the parameters of the next batch.
- */
- @Override
- public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
- }
-
- public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
- return new IPartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
-
- /**
- * Emit a batch of tuples for a partition/transaction that's never been emitted before.
- * Return the metadata that can be used to reconstruct this partition/batch in the future.
- */
- @Override
- public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * Emit a batch of tuples for a partition/transaction that has been emitted before, using
- * the metadata created when it was first emitted.
- */
- @Override
- public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
- reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
- }
-
- /**
- * This method is called when this task is responsible for a new set of partitions. Should be used
- * to manage things like connections to brokers.
- */
- @Override
- public void refreshPartitions(List<Partition> partitions) {
- refresh(partitions);
- }
-
- @Override
- public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
- return orderPartitions(partitionInformation);
- }
-
- @Override
- public void close() {
- clear();
- }
- };
-
- }
+ public static final Logger LOG = LoggerFactory.getLogger(TridentKafkaEmitter.class);
+
+ private DynamicPartitionConnections _connections;
+ private String _topologyName;
+ private KafkaUtils.KafkaOffsetMetric _kafkaOffsetMetric;
+ private ReducedMetric _kafkaMeanFetchLatencyMetric;
+ private CombinedMetric _kafkaMaxFetchLatencyMetric;
+ private TridentKafkaConfig _config;
+ private String _topologyInstanceId;
+
+ public TridentKafkaEmitter(Map conf, TopologyContext context, TridentKafkaConfig config, String topologyInstanceId) {
+ _config = config;
+ _topologyInstanceId = topologyInstanceId;
+ _connections = new DynamicPartitionConnections(_config, KafkaUtils.makeBrokerReader(conf, _config));
+ _topologyName = (String) conf.get(Config.TOPOLOGY_NAME);
+ _kafkaOffsetMetric = new KafkaUtils.KafkaOffsetMetric(_config.topic, _connections);
+ context.registerMetric("kafkaOffset", _kafkaOffsetMetric, 60);
+ _kafkaMeanFetchLatencyMetric = context.registerMetric("kafkaFetchAvg", new MeanReducer(), 60);
+ _kafkaMaxFetchLatencyMetric = context.registerMetric("kafkaFetchMax", new MaxMetric(), 60);
+ }
+
+
+ private Map failFastEmitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ SimpleConsumer consumer = _connections.register(partition);
+ Map ret = doEmitNewPartitionBatch(consumer, partition, collector, lastMeta);
+ _kafkaOffsetMetric.setLatestEmittedOffset(partition, (Long) ret.get("offset"));
+ return ret;
+ }
+
+ private Map emitNewPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map lastMeta) {
+ try {
+ return failFastEmitNewPartitionBatch(attempt, collector, partition, lastMeta);
+ } catch (FailedFetchException e) {
+ LOG.warn("Failed to fetch from partition " + partition);
+ if (lastMeta == null) {
+ return null;
+ } else {
+ Map ret = new HashMap();
+ ret.put("offset", lastMeta.get("nextOffset"));
+ ret.put("nextOffset", lastMeta.get("nextOffset"));
+ ret.put("partition", partition.partition);
+ ret.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+ ret.put("topic", _config.topic);
+ ret.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+ return ret;
+ }
+ }
+ }
+
+ private Map doEmitNewPartitionBatch(SimpleConsumer consumer, Partition partition, TridentCollector collector, Map lastMeta) {
+ long offset;
+ if (lastMeta != null) {
+ String lastInstanceId = null;
+ Map lastTopoMeta = (Map) lastMeta.get("topology");
+ if (lastTopoMeta != null) {
+ lastInstanceId = (String) lastTopoMeta.get("id");
+ }
+ if (_config.forceFromStart && !_topologyInstanceId.equals(lastInstanceId)) {
+ offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, _config.startOffsetTime);
+ } else {
+ offset = (Long) lastMeta.get("nextOffset");
+ }
+ } else {
+ long startTime = kafka.api.OffsetRequest.LatestTime();
+ if (_config.forceFromStart) {
+ startTime = _config.startOffsetTime;
+ }
+ offset = KafkaUtils.getOffset(consumer, _config.topic, partition.partition, startTime);
+ }
+ ByteBufferMessageSet msgs;
+ try {
+ msgs = fetchMessages(consumer, partition, offset);
+ } catch (Exception e) {
+ if (e instanceof ConnectException) {
+ throw new FailedFetchException(e);
+ } else {
+ throw new RuntimeException(e);
+ }
+ }
+ long endoffset = offset;
+ for (MessageAndOffset msg : msgs) {
+ emit(collector, msg.message());
+ endoffset = msg.nextOffset();
+ }
+ Map newMeta = new HashMap();
+ newMeta.put("offset", offset);
+ newMeta.put("nextOffset", endoffset);
+ newMeta.put("instanceId", _topologyInstanceId);
+ newMeta.put("partition", partition.partition);
+ newMeta.put("broker", ImmutableMap.of("host", partition.host.host, "port", partition.host.port));
+ newMeta.put("topic", _config.topic);
+ newMeta.put("topology", ImmutableMap.of("name", _topologyName, "id", _topologyInstanceId));
+ return newMeta;
+ }
+
+ private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
+ ByteBufferMessageSet msgs;
+ long start = System.nanoTime();
+ FetchRequestBuilder builder = new FetchRequestBuilder();
+ FetchRequest fetchRequest = builder.addFetch(_config.topic, partition.partition, offset, _config.fetchSizeBytes).clientId(_config.clientId).build();
+ msgs = consumer.fetch(fetchRequest).messageSet(_config.topic, partition.partition);
+ long end = System.nanoTime();
+ long millis = (end - start) / 1000000;
+ _kafkaMeanFetchLatencyMetric.update(millis);
+ _kafkaMaxFetchLatencyMetric.update(millis);
+ return msgs;
+ }
+
+ /**
+ * re-emit the batch described by the meta data provided
+ *
+ * @param attempt
+ * @param collector
+ * @param partition
+ * @param meta
+ */
+ private void reEmitPartitionBatch(TransactionAttempt attempt, TridentCollector collector, Partition partition, Map meta) {
+ LOG.info("re-emitting batch, attempt " + attempt);
+ String instanceId = (String) meta.get("instanceId");
+ if (!_config.forceFromStart || instanceId.equals(_topologyInstanceId)) {
+ SimpleConsumer consumer = _connections.register(partition);
+ long offset = (Long) meta.get("offset");
+ long nextOffset = (Long) meta.get("nextOffset");
+ ByteBufferMessageSet msgs = fetchMessages(consumer, partition, offset);
+ for (MessageAndOffset msg : msgs) {
+ if (offset == nextOffset) {
+ break;
+ }
+ if (offset > nextOffset) {
+ throw new RuntimeException("Error when re-emitting batch. overshot the end offset");
+ }
+ emit(collector, msg.message());
+ offset = msg.nextOffset();
+ }
+ }
+ }
+
+ private void emit(TridentCollector collector, Message msg) {
+ Iterable<List<Object>> values =
+ _config.scheme.deserialize(Utils.toByteArray(msg.payload()));
+ if (values != null) {
+ for (List<Object> value : values) {
+ collector.emit(value);
+ }
+ }
+ }
+
+ private void clear() {
+ _connections.clear();
+ }
+
+ private List<Partition> orderPartitions(GlobalPartitionInformation partitions) {
+ return partitions.getOrderedPartitions();
+ }
+
+ private void refresh(List<Partition> list) {
+ _connections.clear();
+ _kafkaOffsetMetric.refreshPartitions(new HashSet<Partition>(list));
+ }
+
+
+ public IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map> asOpaqueEmitter() {
+
+ return new IOpaquePartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
+
+ /**
+ * Emit a batch of tuples for a partition/transaction.
+ *
+ * Return the metadata describing this batch that will be used as lastPartitionMeta
+ * for defining the parameters of the next batch.
+ */
+ @Override
+ public Map emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ return emitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ @Override
+ public void refreshPartitions(List<Partition> partitions) {
+ refresh(partitions);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
+ return orderPartitions(partitionInformation);
+ }
+
+ @Override
+ public void close() {
+ clear();
+ }
+ };
+ }
+
+ public IPartitionedTridentSpout.Emitter asTransactionalEmitter() {
+ return new IPartitionedTridentSpout.Emitter<GlobalPartitionInformation, Partition, Map>() {
+
+ /**
+ * Emit a batch of tuples for a partition/transaction that's never been emitted before.
+ * Return the metadata that can be used to reconstruct this partition/batch in the future.
+ */
+ @Override
+ public Map emitPartitionBatchNew(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ return failFastEmitNewPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ /**
+ * Emit a batch of tuples for a partition/transaction that has been emitted before, using
+ * the metadata created when it was first emitted.
+ */
+ @Override
+ public void emitPartitionBatch(TransactionAttempt transactionAttempt, TridentCollector tridentCollector, Partition partition, Map map) {
+ reEmitPartitionBatch(transactionAttempt, tridentCollector, partition, map);
+ }
+
+ /**
+ * This method is called when this task is responsible for a new set of partitions. Should be used
+ * to manage things like connections to brokers.
+ */
+ @Override
+ public void refreshPartitions(List<Partition> partitions) {
+ refresh(partitions);
+ }
+
+ @Override
+ public List<Partition> getOrderedPartitions(GlobalPartitionInformation partitionInformation) {
+ return orderPartitions(partitionInformation);
+ }
+
+ @Override
+ public void close() {
+ clear();
+ }
+ };
+
+ }
}