You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:44 UTC
[45/50] [abbrv] git commit: add topic to committed path
add topic to committed path
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/859a2e81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/859a2e81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/859a2e81
Branch: refs/heads/master
Commit: 859a2e81be8f61720fc2965acef5f73f1449a7bb
Parents: a72aafa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 9 10:57:37 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 9 10:57:37 2014 -0400
----------------------------------------------------------------------
.../src/jvm/storm/kafka/DynamicBrokersReader.java | 18 +++++++++++-------
.../src/jvm/storm/kafka/KafkaUtils.java | 4 ++--
.../src/jvm/storm/kafka/PartitionManager.java | 2 +-
3 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index cd751fe..b9085bc 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -26,13 +26,17 @@ public class DynamicBrokersReader {
public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
_zkPath = zkPath;
_topic = topic;
- _curator = CuratorFrameworkFactory.newClient(
- zkStr,
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
- 15000,
- new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
- Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
- _curator.start();
+ try {
+ _curator = CuratorFrameworkFactory.newClient(
+ zkStr,
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+ 15000,
+ new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+ Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+ _curator.start();
+ } catch (Exception ex) {
+ LOG.error("can't connect to zookeeper");
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0e7f601..313e08e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -41,7 +41,7 @@ public class KafkaUtils {
public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
- if (config.forceFromStart) {
+ if ( config.forceFromStart ) {
startOffsetTime = config.startOffsetTime;
}
return getOffset(consumer, topic, partition, startOffsetTime);
@@ -93,7 +93,7 @@ public class KafkaUtils {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
- long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+ long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 03075bb..68151eb 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -194,7 +194,7 @@ public class PartitionManager {
}
private String committedPath() {
- return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
+ return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _spoutConfig.topic + "/" + _partition.getId();
}
public long queryPartitionOffsetLatestTime() {