You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:39 UTC
[08/45] storm git commit: STORM-713: Include topic information with
Kafka metrics.
STORM-713: Include topic information with Kafka metrics.
Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e3bf8c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e3bf8c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e3bf8c0
Branch: refs/heads/nimbus-ha-branch
Commit: 1e3bf8c09cb1be6b60ca57fce2b6d859a57e173a
Parents: bb8d48d
Author: Craig Hawco <cr...@upsight.com>
Authored: Thu Mar 19 11:35:59 2015 -0400
Committer: Craig Hawco <cr...@upsight.com>
Committed: Thu Mar 19 14:02:26 2015 -0400
----------------------------------------------------------------------
.../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 16 ++++++++--------
1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/storm/blob/1e3bf8c0/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..50241f7 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -121,19 +121,19 @@ public class KafkaUtils {
}
long latestEmittedOffset = e.getValue();
long spoutLag = latestTimeOffset - latestEmittedOffset;
- ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
- ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
- ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
- ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
+ ret.put(_topic + "/" + partition.getId() + "/" + "spoutLag", spoutLag);
+ ret.put(_topic + "/" + partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
+ ret.put(_topic + "/" + partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
+ ret.put(_topic + "/" + partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
totalSpoutLag += spoutLag;
totalEarliestTimeOffset += earliestTimeOffset;
totalLatestTimeOffset += latestTimeOffset;
totalLatestEmittedOffset += latestEmittedOffset;
}
- ret.put("totalSpoutLag", totalSpoutLag);
- ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
- ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
- ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
+ ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag);
+ ret.put(_topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffset);
+ ret.put(_topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffset);
+ ret.put(_topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffset);
return ret;
} else {
LOG.info("Metrics Tick: Not enough data to calculate spout lag.");