You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/13 18:11:04 UTC

git commit: kafka-801; Fix MessagesInPerSec mbean to count uncompressed message rate; patched by Jun Rao; reviewed by Neha Narkhede

Updated Branches:
  refs/heads/0.8 290d5e0ea -> dd9676163


kafka-801; Fix MessagesInPerSec mbean to count uncompressed message rate; patched by Jun Rao; reviewed by Neha Narkhede


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd967616
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd967616
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd967616

Branch: refs/heads/0.8
Commit: dd9676163956b3e577808a1e9744aa9fb5e83e4e
Parents: 290d5e0
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Mar 13 10:10:52 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 13 10:10:52 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala |    6 +++---
 1 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/dd967616/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b2a7170..34c5376 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -265,9 +265,6 @@ private[kafka] class Log(val dir: File,
     if(messageSetInfo.count == 0) {
       (-1L, -1L)
     } else {
-      BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
-      BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
-
       // trim any invalid bytes or partial messages before appending it to the on-disk log
       var validMessages = trimInvalidBytes(messages)
 
@@ -288,6 +285,9 @@ private[kafka] class Log(val dir: File,
                 case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
               }
               val lastOffset = offsetCounter.get - 1
+              val numMessages = lastOffset - firstOffset + 1
+              BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
+              BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
               (firstOffset, lastOffset)
             } else {
               require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)