You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:44 UTC

[45/50] [abbrv] git commit: add topic to committed path

add topic to committed path


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/859a2e81
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/859a2e81
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/859a2e81

Branch: refs/heads/master
Commit: 859a2e81be8f61720fc2965acef5f73f1449a7bb
Parents: a72aafa
Author: P. Taylor Goetz <pt...@gmail.com>
Authored: Wed Apr 9 10:57:37 2014 -0400
Committer: P. Taylor Goetz <pt...@gmail.com>
Committed: Wed Apr 9 10:57:37 2014 -0400

----------------------------------------------------------------------
 .../src/jvm/storm/kafka/DynamicBrokersReader.java | 18 +++++++++++-------
 .../src/jvm/storm/kafka/KafkaUtils.java           |  4 ++--
 .../src/jvm/storm/kafka/PartitionManager.java     |  2 +-
 3 files changed, 14 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
index cd751fe..b9085bc 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/DynamicBrokersReader.java
@@ -26,13 +26,17 @@ public class DynamicBrokersReader {
     public DynamicBrokersReader(Map conf, String zkStr, String zkPath, String topic) {
         _zkPath = zkPath;
         _topic = topic;
-        _curator = CuratorFrameworkFactory.newClient(
-                zkStr,
-                Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
-                15000,
-                new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
-                        Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
-        _curator.start();
+        try {
+            _curator = CuratorFrameworkFactory.newClient(
+                    zkStr,
+                    Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_SESSION_TIMEOUT)),
+                    15000,
+                    new RetryNTimes(Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_TIMES)),
+                            Utils.getInt(conf.get(Config.STORM_ZOOKEEPER_RETRY_INTERVAL))));
+            _curator.start();
+        } catch (Exception ex) {
+            LOG.error("can't connect to zookeeper");
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 0e7f601..313e08e 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -41,7 +41,7 @@ public class KafkaUtils {
 
     public static long getOffset(SimpleConsumer consumer, String topic, int partition, KafkaConfig config) {
         long startOffsetTime = kafka.api.OffsetRequest.LatestTime();
-        if (config.forceFromStart) {
+        if ( config.forceFromStart ) {
             startOffsetTime = config.startOffsetTime;
         }
         return getOffset(consumer, topic, partition, startOffsetTime);
@@ -93,7 +93,7 @@ public class KafkaUtils {
                             LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                             return null;
                         }
-                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
+                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); 
                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
                         if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
                             LOG.warn("No data found in Kafka Partition " + partition.getId());

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/859a2e81/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
index 03075bb..68151eb 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/PartitionManager.java
@@ -194,7 +194,7 @@ public class PartitionManager {
     }
 
     private String committedPath() {
-        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
+        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" +  _spoutConfig.topic + "/" + _partition.getId();
     }
 
     public long queryPartitionOffsetLatestTime() {