You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:06 UTC

[07/50] [abbrv] git commit: Use partition id in zk path

Use partition id in zk path

* added topology id to logs


Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b5de86ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b5de86ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b5de86ed

Branch: refs/heads/master
Commit: b5de86ed7f966018454fba667a42010f0f73f490
Parents: 735b87f
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Jan 5 12:17:55 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Jan 5 12:17:55 2014 +0000

----------------------------------------------------------------------
 src/jvm/storm/kafka/Partition.java        |  2 +-
 src/jvm/storm/kafka/PartitionManager.java | 12 +++++++-----
 2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b5de86ed/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index bbb4fbb..96a3ad7 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -41,7 +41,7 @@ public class Partition implements ISpoutPartition {
 
     @Override
     public String getId() {
-        return toString();
+        return "partition_" + partition;
     }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b5de86ed/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 623bc10..e3e31db 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -63,14 +63,16 @@ public class PartitionManager {
 
         String jsonTopologyId = null;
         Long jsonOffset = null;
+        String path = committedPath();
         try {
-            Map<Object, Object> json = _state.readJSON(committedPath());
+            Map<Object, Object> json = _state.readJSON(path);
+            LOG.info("Read partition information from: " + path +  "  --> " + json );
             if (json != null) {
                 jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
                 jsonOffset = (Long) json.get("offset");
             }
         } catch (Throwable e) {
-            LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
+            LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
         }
 
         if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
@@ -81,7 +83,7 @@ public class PartitionManager {
             LOG.info("Setting last commit offset to HEAD.");
         } else {
             _committedTo = jsonOffset;
-            LOG.info("Read last commit offset from zookeeper: " + _committedTo);
+            LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
         }
 
         LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
@@ -205,11 +207,11 @@ public class PartitionManager {
             LOG.info("Wrote committed offset to ZK: " + committedTo);
             _committedTo = committedTo;
         }
-        LOG.info("Committed offset " + committedTo + " for " + _partition);
+        LOG.info("Committed offset " + committedTo + " for " + _partition + " for topology: " + _topologyInstanceId);
     }
 
     private String committedPath() {
-        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition;
+        return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
     }
 
     public long queryPartitionOffsetLatestTime() {