You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/02/18 02:35:50 UTC
[2/2] kafka git commit: KAFKA-1914;
Include total produce/fetch stats in broker topic metrics.
KAFKA-1914; Include total produce/fetch stats in broker topic metrics.
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb40ec2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb40ec2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb40ec2e
Branch: refs/heads/trunk
Commit: cb40ec2e7a73bdecdfea6c88ffd8e8717b630d0f
Parents: b8be314
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Feb 17 17:36:12 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Feb 17 17:36:12 2015 -0800
----------------------------------------------------------------------
core/src/main/scala/kafka/server/KafkaRequestHandler.scala | 2 ++
core/src/main/scala/kafka/server/ReplicaManager.scala | 6 ++++++
core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala | 8 +++++++-
3 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index e4053fb..4d86bdf 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,6 +105,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+ val totalProduceRequestRate = newMeter("TotalProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+ val totalFetchRequestRate = newMeter("TotalFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
}
object BrokerTopicStats extends Logging {
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 399f7c4..b82ff55 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -323,6 +323,9 @@ class ReplicaManager(val config: KafkaConfig,
requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = {
trace("Append [%s] to local log ".format(messagesPerPartition))
messagesPerPartition.map { case (topicAndPartition, messages) =>
+ BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
+
// reject appending to internal topics if it is not allowed
if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) {
@@ -439,6 +442,9 @@ class ReplicaManager(val config: KafkaConfig,
readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+ BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
+ BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
+
val partitionDataAndOffsetInfo =
try {
trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize))
http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ccf5e2e..292a042 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -129,13 +129,19 @@ class SimpleFetchTest extends JUnit3Suite {
* should only return data up to the HW of the partition; when a fetch operation with read
* committed data turned off is received, the replica manager could return data up to the LEO
* of the local leader replica's log.
+ *
+ * This test also verifies counts of fetch requests recorded by the ReplicaManager
*/
def testReadFromLog() {
+ val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
+ val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();
assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
-
assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+
+ assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count());
+ assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count());
}
}