You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2013/05/31 19:43:36 UTC

git commit: KAFKA-921; Expose max lag and min fetch rate mbeans for consumers and replica fetchers; reviewed by Jun Rao.

Updated Branches:
  refs/heads/0.8 1caae2c2a -> 4850519a2


KAFKA-921; Expose max lag and min fetch rate mbeans for consumers and replica fetchers; reviewed by Jun Rao.


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/4850519a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/4850519a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/4850519a

Branch: refs/heads/0.8
Commit: 4850519a2c32b3ee882eb79375ba77de8d0488f7
Parents: 1caae2c
Author: Joel Koshy <jj...@gmail.com>
Authored: Fri May 31 10:43:28 2013 -0700
Committer: Joel Koshy <jj...@gmail.com>
Committed: Fri May 31 10:43:28 2013 -0700

----------------------------------------------------------------------
 .../kafka/consumer/ConsumerFetcherManager.scala    |    3 +-
 .../kafka/server/AbstractFetcherManager.scala      |   34 ++++++++++++++-
 .../scala/kafka/server/AbstractFetcherThread.scala |    2 +-
 .../scala/kafka/server/ReplicaFetcherManager.scala |    3 +-
 4 files changed, 38 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
index 96bd886..3e497b9 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerFetcherManager.scala
@@ -38,7 +38,8 @@ import java.util.concurrent.atomic.AtomicInteger
 class ConsumerFetcherManager(private val consumerIdString: String,
                              private val config: ConsumerConfig,
                              private val zkClient : ZkClient)
-        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds), 1) {
+        extends AbstractFetcherManager("ConsumerFetcherManager-%d".format(SystemTime.milliseconds),
+                                       config.groupId, 1) {
   private var partitionMap: immutable.Map[TopicAndPartition, PartitionTopicInfo] = null
   private var cluster: Cluster = null
   private val noLeaderPartitionSet = new mutable.HashSet[TopicAndPartition]

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
index 4269219..15b7bd3 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherManager.scala
@@ -20,13 +20,45 @@ package kafka.server
 import scala.collection.mutable
 import kafka.utils.Logging
 import kafka.cluster.Broker
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
-abstract class AbstractFetcherManager(protected val name: String, numFetchers: Int = 1) extends Logging {
+abstract class AbstractFetcherManager(protected val name: String, metricPrefix: String, numFetchers: Int = 1)
+  extends Logging with KafkaMetricsGroup {
     // map of (source brokerid, fetcher Id per source broker) => fetcher
   private val fetcherThreadMap = new mutable.HashMap[BrokerAndFetcherId, AbstractFetcherThread]
   private val mapLock = new Object
   this.logIdent = "[" + name + "] "
 
+  newGauge(
+    metricPrefix + "-MaxLag",
+    new Gauge[Long] {
+      // current max lag across all fetchers/topics/partitions
+      def value = fetcherThreadMap.foldLeft(0L)((curMaxAll, fetcherThreadMapEntry) => {
+        fetcherThreadMapEntry._2.fetcherLagStats.stats.foldLeft(0L)((curMaxThread, fetcherLagStatsEntry) => {
+          curMaxThread.max(fetcherLagStatsEntry._2.lag)
+        }).max(curMaxAll)
+      })
+    }
+  )
+
+  newGauge(
+    metricPrefix + "-MinFetchRate",
+    {
+      new Gauge[Double] {
+        // current min fetch rate across all fetchers/topics/partitions
+        def value = {
+          val headRate: Double =
+            fetcherThreadMap.headOption.map(_._2.fetcherStats.requestRate.oneMinuteRate).getOrElse(0)
+
+          fetcherThreadMap.foldLeft(headRate)((curMinAll, fetcherThreadMapEntry) => {
+            fetcherThreadMapEntry._2.fetcherStats.requestRate.oneMinuteRate.min(curMinAll)
+          })
+        }
+      }
+    }
+  )
+
   private def getFetcherId(topic: String, partitionId: Int) : Int = {
     (topic.hashCode() + 31 * partitionId) % numFetchers
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
index 48100f4..b7cbb98 100644
--- a/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
+++ b/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
@@ -221,7 +221,7 @@ class FetcherLagMetrics(metricId: ClientIdBrokerTopicPartition) extends KafkaMet
 
 class FetcherLagStats(metricId: ClientIdAndBroker) {
   private val valueFactory = (k: ClientIdBrokerTopicPartition) => new FetcherLagMetrics(k)
-  private val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
+  val stats = new Pool[ClientIdBrokerTopicPartition, FetcherLagMetrics](Some(valueFactory))
 
   def getFetcherLagStats(topic: String, partitionId: Int): FetcherLagMetrics = {
     stats.getAndMaybePut(new ClientIdBrokerTopicPartition(metricId.clientId, metricId.brokerInfo, topic, partitionId))

http://git-wip-us.apache.org/repos/asf/kafka/blob/4850519a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
index 7f775ec..351dbba 100644
--- a/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaFetcherManager.scala
@@ -20,7 +20,8 @@ package kafka.server
 import kafka.cluster.Broker
 
 class ReplicaFetcherManager(private val brokerConfig: KafkaConfig, private val replicaMgr: ReplicaManager)
-        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId, brokerConfig.numReplicaFetchers) {
+        extends AbstractFetcherManager("ReplicaFetcherManager on broker " + brokerConfig.brokerId,
+                                       "Replica", brokerConfig.numReplicaFetchers) {
 
   override def createFetcherThread(fetcherId: Int, sourceBroker: Broker): AbstractFetcherThread = {
     new ReplicaFetcherThread("ReplicaFetcherThread-%d-%d".format(fetcherId, sourceBroker.id), sourceBroker, brokerConfig, replicaMgr)