You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/02/18 02:35:50 UTC

[2/2] kafka git commit: KAFKA-1914; Include total produce/fetch stats in broker topic metrics.

KAFKA-1914; Include total produce/fetch stats in broker topic metrics.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cb40ec2e
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cb40ec2e
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cb40ec2e

Branch: refs/heads/trunk
Commit: cb40ec2e7a73bdecdfea6c88ffd8e8717b630d0f
Parents: b8be314
Author: Aditya Auradkar <aa...@linkedin.com>
Authored: Tue Feb 17 17:36:12 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Tue Feb 17 17:36:12 2015 -0800

----------------------------------------------------------------------
 core/src/main/scala/kafka/server/KafkaRequestHandler.scala  | 2 ++
 core/src/main/scala/kafka/server/ReplicaManager.scala       | 6 ++++++
 core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala | 8 +++++++-
 3 files changed, 15 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
index e4053fb..4d86bdf 100644
--- a/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
+++ b/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
@@ -105,6 +105,8 @@ class BrokerTopicMetrics(name: Option[String]) extends KafkaMetricsGroup {
   val bytesRejectedRate = newMeter("BytesRejectedPerSec", "bytes", TimeUnit.SECONDS, tags)
   val failedProduceRequestRate = newMeter("FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
   val failedFetchRequestRate = newMeter("FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val totalProduceRequestRate = newMeter("TotalProduceRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
+  val totalFetchRequestRate = newMeter("TotalFetchRequestsPerSec", "requests", TimeUnit.SECONDS, tags)
 }
 
 object BrokerTopicStats extends Logging {

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 399f7c4..b82ff55 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -323,6 +323,9 @@ class ReplicaManager(val config: KafkaConfig,
                                requiredAcks: Short): Map[TopicAndPartition, LogAppendResult] = {
     trace("Append [%s] to local log ".format(messagesPerPartition))
     messagesPerPartition.map { case (topicAndPartition, messages) =>
+      BrokerTopicStats.getBrokerTopicStats(topicAndPartition.topic).totalProduceRequestRate.mark()
+      BrokerTopicStats.getBrokerAllTopicsStats().totalProduceRequestRate.mark()
+
       // reject appending to internal topics if it is not allowed
       if (Topic.InternalTopics.contains(topicAndPartition.topic) && !internalTopicsAllowed) {
 
@@ -439,6 +442,9 @@ class ReplicaManager(val config: KafkaConfig,
                        readPartitionInfo: Map[TopicAndPartition, PartitionFetchInfo]): Map[TopicAndPartition, LogReadResult] = {
 
     readPartitionInfo.map { case (TopicAndPartition(topic, partition), PartitionFetchInfo(offset, fetchSize)) =>
+      BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.mark()
+      BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.mark()
+
       val partitionDataAndOffsetInfo =
         try {
           trace("Fetching log segment for topic %s, partition %d, offset %d, size %d".format(topic, partition, offset, fetchSize))

http://git-wip-us.apache.org/repos/asf/kafka/blob/cb40ec2e/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
index ccf5e2e..292a042 100644
--- a/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
+++ b/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
@@ -129,13 +129,19 @@ class SimpleFetchTest extends JUnit3Suite {
    * should only return data up to the HW of the partition; when a fetch operation with read
    * committed data turned off is received, the replica manager could return data up to the LEO
    * of the local leader replica's log.
+   *
+   * This test also verifies counts of fetch requests recorded by the ReplicaManager
    */
   def testReadFromLog() {
+    val initialTopicCount = BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count();
+    val initialAllTopicsCount = BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count();
 
     assertEquals("Reading committed data should return messages only up to high watermark", messagesToHW,
       replicaManager.readFromLocalLog(true, true, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
-
     assertEquals("Reading any data can return messages up to the end of the log", messagesToLEO,
       replicaManager.readFromLocalLog(true, false, fetchInfo).get(topicAndPartition).get.info.messageSet.head.message)
+
+    assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerTopicStats(topic).totalFetchRequestRate.count());
+    assertEquals("Counts should increment after fetch", initialTopicCount+2, BrokerTopicStats.getBrokerAllTopicsStats().totalFetchRequestRate.count());
   }
 }