You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pa...@apache.org on 2015/04/01 19:49:39 UTC

[08/45] storm git commit: STORM-713: Include topic information with Kafka metrics.

STORM-713: Include topic information with Kafka metrics.


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/1e3bf8c0
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/1e3bf8c0
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/1e3bf8c0

Branch: refs/heads/nimbus-ha-branch
Commit: 1e3bf8c09cb1be6b60ca57fce2b6d859a57e173a
Parents: bb8d48d
Author: Craig Hawco <cr...@upsight.com>
Authored: Thu Mar 19 11:35:59 2015 -0400
Committer: Craig Hawco <cr...@upsight.com>
Committed: Thu Mar 19 14:02:26 2015 -0400

----------------------------------------------------------------------
 .../storm-kafka/src/jvm/storm/kafka/KafkaUtils.java | 16 ++++++++--------
 1 file changed, 8 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/1e3bf8c0/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
index 137dc99..50241f7 100644
--- a/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
+++ b/external/storm-kafka/src/jvm/storm/kafka/KafkaUtils.java
@@ -121,19 +121,19 @@ public class KafkaUtils {
                         }
                         long latestEmittedOffset = e.getValue();
                         long spoutLag = latestTimeOffset - latestEmittedOffset;
-                        ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
-                        ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
-                        ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
-                        ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + "spoutLag", spoutLag);
+                        ret.put(_topic + "/" + partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
+                        ret.put(_topic + "/" + partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                         totalSpoutLag += spoutLag;
                         totalEarliestTimeOffset += earliestTimeOffset;
                         totalLatestTimeOffset += latestTimeOffset;
                         totalLatestEmittedOffset += latestEmittedOffset;
                     }
-                    ret.put("totalSpoutLag", totalSpoutLag);
-                    ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
-                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
-                    ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
+                    ret.put(_topic + "/" + "totalSpoutLag", totalSpoutLag);
+                    ret.put(_topic + "/" + "totalEarliestTimeOffset", totalEarliestTimeOffset);
+                    ret.put(_topic + "/" + "totalLatestTimeOffset", totalLatestTimeOffset);
+                    ret.put(_topic + "/" + "totalLatestEmittedOffset", totalLatestEmittedOffset);
                     return ret;
                 } else {
                     LOG.info("Metrics Tick: Not enough data to calculate spout lag.");