You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2013/03/13 18:11:04 UTC
git commit: kafka-801;
Fix MessagesInPerSec mbean to count uncompressed message rate;
patched by Jun Rao; reviewed by Neha Narkhede
Updated Branches:
refs/heads/0.8 290d5e0ea -> dd9676163
kafka-801; Fix MessagesInPerSec mbean to count uncompressed message rate; patched by Jun Rao; reviewed by Neha Narkhede
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/dd967616
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/dd967616
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/dd967616
Branch: refs/heads/0.8
Commit: dd9676163956b3e577808a1e9744aa9fb5e83e4e
Parents: 290d5e0
Author: Jun Rao <ju...@gmail.com>
Authored: Wed Mar 13 10:10:52 2013 -0700
Committer: Jun Rao <ju...@gmail.com>
Committed: Wed Mar 13 10:10:52 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 6 +++---
1 files changed, 3 insertions(+), 3 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/dd967616/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index b2a7170..34c5376 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -265,9 +265,6 @@ private[kafka] class Log(val dir: File,
if(messageSetInfo.count == 0) {
(-1L, -1L)
} else {
- BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(messageSetInfo.count)
- BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(messageSetInfo.count)
-
// trim any invalid bytes or partial messages before appending it to the on-disk log
var validMessages = trimInvalidBytes(messages)
@@ -288,6 +285,9 @@ private[kafka] class Log(val dir: File,
case e: IOException => throw new KafkaException("Error in validating messages while appending to log '%s'".format(name), e)
}
val lastOffset = offsetCounter.get - 1
+ val numMessages = lastOffset - firstOffset + 1
+ BrokerTopicStats.getBrokerTopicStats(topicName).messagesInRate.mark(numMessages)
+ BrokerTopicStats.getBrokerAllTopicsStats.messagesInRate.mark(numMessages)
(firstOffset, lastOffset)
} else {
require(messageSetInfo.offsetsMonotonic, "Out of order offsets found in " + messages)