You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@storm.apache.org by kn...@apache.org on 2016/02/24 20:56:27 UTC

[2/4] storm git commit: Update time interval counting on TridentKafkaEmitter

Update time interval counting on TridentKafkaEmitter


Project: http://git-wip-us.apache.org/repos/asf/storm/repo
Commit: http://git-wip-us.apache.org/repos/asf/storm/commit/d89f7027
Tree: http://git-wip-us.apache.org/repos/asf/storm/tree/d89f7027
Diff: http://git-wip-us.apache.org/repos/asf/storm/diff/d89f7027

Branch: refs/heads/master
Commit: d89f7027fcaf5576b5b4a14488f42c71094617ad
Parents: 27373ba
Author: darionyaphet <da...@gmail.com>
Authored: Wed Feb 24 12:34:54 2016 +0800
Committer: darionyaphet <da...@gmail.com>
Committed: Wed Feb 24 12:34:54 2016 +0800

----------------------------------------------------------------------
 .../jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java | 5 ++---
 1 file changed, 2 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/storm/blob/d89f7027/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
----------------------------------------------------------------------
diff --git a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
index 9732c8c..512363c 100644
--- a/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
+++ b/external/storm-kafka/src/jvm/org/apache/storm/kafka/trident/TridentKafkaEmitter.java
@@ -136,11 +136,10 @@ public class TridentKafkaEmitter {
     }
 
     private ByteBufferMessageSet fetchMessages(SimpleConsumer consumer, Partition partition, long offset) {
-        long start = System.nanoTime();
+        long start = System.currentTimeMillis();
         ByteBufferMessageSet msgs = null;
         msgs = KafkaUtils.fetchMessages(_config, consumer, partition, offset);
-        long end = System.nanoTime();
-        long millis = (end - start) / 1000000;
+        long millis = System.currentTimeMillis() - start;
         _kafkaMeanFetchLatencyMetric.update(millis);
         _kafkaMaxFetchLatencyMetric.update(millis);
         return msgs;