You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:06 UTC
[07/50] [abbrv] git commit: Use partition id in zk path
Use partition id in zk path
* added topology id to logs
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/b5de86ed
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/b5de86ed
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/b5de86ed
Branch: refs/heads/master
Commit: b5de86ed7f966018454fba667a42010f0f73f490
Parents: 735b87f
Author: wurstmeister <wu...@users.noreply.github.com>
Authored: Sun Jan 5 12:17:55 2014 +0000
Committer: wurstmeister <wu...@users.noreply.github.com>
Committed: Sun Jan 5 12:17:55 2014 +0000
----------------------------------------------------------------------
src/jvm/storm/kafka/Partition.java | 2 +-
src/jvm/storm/kafka/PartitionManager.java | 12 +++++++-----
2 files changed, 8 insertions(+), 6 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b5de86ed/src/jvm/storm/kafka/Partition.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/Partition.java b/src/jvm/storm/kafka/Partition.java
index bbb4fbb..96a3ad7 100644
--- a/src/jvm/storm/kafka/Partition.java
+++ b/src/jvm/storm/kafka/Partition.java
@@ -41,7 +41,7 @@ public class Partition implements ISpoutPartition {
@Override
public String getId() {
- return toString();
+ return "partition_" + partition;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/b5de86ed/src/jvm/storm/kafka/PartitionManager.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/PartitionManager.java b/src/jvm/storm/kafka/PartitionManager.java
index 623bc10..e3e31db 100644
--- a/src/jvm/storm/kafka/PartitionManager.java
+++ b/src/jvm/storm/kafka/PartitionManager.java
@@ -63,14 +63,16 @@ public class PartitionManager {
String jsonTopologyId = null;
Long jsonOffset = null;
+ String path = committedPath();
try {
- Map<Object, Object> json = _state.readJSON(committedPath());
+ Map<Object, Object> json = _state.readJSON(path);
+ LOG.info("Read partition information from: " + path + " --> " + json );
if (json != null) {
jsonTopologyId = (String) ((Map<Object, Object>) json.get("topology")).get("id");
jsonOffset = (Long) json.get("offset");
}
} catch (Throwable e) {
- LOG.warn("Error reading and/or parsing at ZkNode: " + committedPath(), e);
+ LOG.warn("Error reading and/or parsing at ZkNode: " + path, e);
}
if (!topologyInstanceId.equals(jsonTopologyId) && spoutConfig.forceFromStart) {
@@ -81,7 +83,7 @@ public class PartitionManager {
LOG.info("Setting last commit offset to HEAD.");
} else {
_committedTo = jsonOffset;
- LOG.info("Read last commit offset from zookeeper: " + _committedTo);
+ LOG.info("Read last commit offset from zookeeper: " + _committedTo + "; old topology_id: " + jsonTopologyId + " - new topology_id: " + topologyInstanceId );
}
LOG.info("Starting Kafka " + _consumer.host() + ":" + id.partition + " from offset " + _committedTo);
@@ -205,11 +207,11 @@ public class PartitionManager {
LOG.info("Wrote committed offset to ZK: " + committedTo);
_committedTo = committedTo;
}
- LOG.info("Committed offset " + committedTo + " for " + _partition);
+ LOG.info("Committed offset " + committedTo + " for " + _partition + " for topology: " + _topologyInstanceId);
}
private String committedPath() {
- return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition;
+ return _spoutConfig.zkRoot + "/" + _spoutConfig.id + "/" + _partition.getId();
}
public long queryPartitionOffsetLatestTime() {