You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:25 UTC

[26/50] [abbrv] git commit: Add a new per-partition and total metric, "earliestTimeOffset". Rename "latestTime" metric to "latestTimeOffset" to more closely conform to other, related metrics' nomenclature.

Add a new per-partition and total metric, "earliestTimeOffset". Rename
"latestTime" metric to "latestTimeOffset" to more closely conform to
other, related metrics' nomenclature.

Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c695c1b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c695c1b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c695c1b0

Branch: refs/heads/master
Commit: c695c1b06cd1f44fbbe1181a41a6c4ffe2186e01
Parents: bd0cc45
Author: Danijel Schiavuzzi <da...@infobip.com>
Authored: Thu Feb 20 18:09:25 2014 +0100
Committer: Danijel Schiavuzzi <da...@infobip.com>
Committed: Mon Feb 24 17:05:45 2014 +0100

----------------------------------------------------------------------
 src/jvm/storm/kafka/KafkaUtils.java | 11 ++++++++---
 1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c695c1b0/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 300d998..eed438f 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -79,6 +79,7 @@ public class KafkaUtils {
         public Object getValueAndReset() {
             try {
                 long totalSpoutLag = 0;
+                long totalEarliestTimeOffset = 0;
                 long totalLatestTimeOffset = 0;
                 long totalLatestEmittedOffset = 0;
                 HashMap ret = new HashMap();
@@ -90,22 +91,26 @@ public class KafkaUtils {
                             LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
                             return null;
                         }
+                        long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime()); 
                         long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
-                        if (latestTimeOffset == 0) {
+                        if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
                             LOG.warn("No data found in Kafka Partition " + partition.getId());
                             return null;
                         }
                         long latestEmittedOffset = e.getValue();
                         long spoutLag = latestTimeOffset - latestEmittedOffset;
                         ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
-                        ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
+                        ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
+                        ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
                         ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
                         totalSpoutLag += spoutLag;
+                        totalEarliestTimeOffset += earliestTimeOffset;
                         totalLatestTimeOffset += latestTimeOffset;
                         totalLatestEmittedOffset += latestEmittedOffset;
                     }
                     ret.put("totalSpoutLag", totalSpoutLag);
-                    ret.put("totalLatestTime", totalLatestTimeOffset);
+                    ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
+                    ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
                     ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
                     return ret;
                 } else {