You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by pt...@apache.org on 2014/04/21 21:44:25 UTC
[26/50] [abbrv] git commit: Add a new per-partition and total metric,
"earliestTimeOffset". Rename "latestTime" metric to
"latestTimeOffset" to more closely conform to other,
related metrics' nomenclature.
Add a new per-partition and total metric, "earliestTimeOffset". Rename
"latestTime" metric to "latestTimeOffset" to more closely conform to
other, related metrics' nomenclature.
Project: http://git-wip-us.apache.org/repos/asf/incubator-storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-storm/commit/c695c1b0
Tree: http://git-wip-us.apache.org/repos/asf/incubator-storm/tree/c695c1b0
Diff: http://git-wip-us.apache.org/repos/asf/incubator-storm/diff/c695c1b0
Branch: refs/heads/master
Commit: c695c1b06cd1f44fbbe1181a41a6c4ffe2186e01
Parents: bd0cc45
Author: Danijel Schiavuzzi <da...@infobip.com>
Authored: Thu Feb 20 18:09:25 2014 +0100
Committer: Danijel Schiavuzzi <da...@infobip.com>
Committed: Mon Feb 24 17:05:45 2014 +0100
----------------------------------------------------------------------
src/jvm/storm/kafka/KafkaUtils.java | 11 ++++++++---
1 file changed, 8 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-storm/blob/c695c1b0/src/jvm/storm/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/src/jvm/storm/kafka/KafkaUtils.java b/src/jvm/storm/kafka/KafkaUtils.java
index 300d998..eed438f 100644
--- a/src/jvm/storm/kafka/KafkaUtils.java
+++ b/src/jvm/storm/kafka/KafkaUtils.java
@@ -79,6 +79,7 @@ public class KafkaUtils {
public Object getValueAndReset() {
try {
long totalSpoutLag = 0;
+ long totalEarliestTimeOffset = 0;
long totalLatestTimeOffset = 0;
long totalLatestEmittedOffset = 0;
HashMap ret = new HashMap();
@@ -90,22 +91,26 @@ public class KafkaUtils {
LOG.warn("partitionToOffset contains partition not found in _connections. Stale partition data?");
return null;
}
+ long earliestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.EarliestTime());
long latestTimeOffset = getOffset(consumer, _topic, partition.partition, kafka.api.OffsetRequest.LatestTime());
- if (latestTimeOffset == 0) {
+ if (earliestTimeOffset == 0 || latestTimeOffset == 0) {
LOG.warn("No data found in Kafka Partition " + partition.getId());
return null;
}
long latestEmittedOffset = e.getValue();
long spoutLag = latestTimeOffset - latestEmittedOffset;
ret.put(partition.getId() + "/" + "spoutLag", spoutLag);
- ret.put(partition.getId() + "/" + "latestTime", latestTimeOffset);
+ ret.put(partition.getId() + "/" + "earliestTimeOffset", earliestTimeOffset);
+ ret.put(partition.getId() + "/" + "latestTimeOffset", latestTimeOffset);
ret.put(partition.getId() + "/" + "latestEmittedOffset", latestEmittedOffset);
totalSpoutLag += spoutLag;
+ totalEarliestTimeOffset += earliestTimeOffset;
totalLatestTimeOffset += latestTimeOffset;
totalLatestEmittedOffset += latestEmittedOffset;
}
ret.put("totalSpoutLag", totalSpoutLag);
- ret.put("totalLatestTime", totalLatestTimeOffset);
+ ret.put("totalEarliestTimeOffset", totalEarliestTimeOffset);
+ ret.put("totalLatestTimeOffset", totalLatestTimeOffset);
ret.put("totalLatestEmittedOffset", totalLatestEmittedOffset);
return ret;
} else {