You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/13 06:27:16 UTC

svn commit: r1384202 [1/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/log/ core/src/main/scala/k...

Author: junrao
Date: Thu Sep 13 04:27:13 2012
New Revision: 1384202

URL: http://svn.apache.org/viewvc?rev=1384202&view=rev
Log:
Improve Kafka internal metrics; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; KAFKA-203

Removed:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
Modified:
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
    incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
    incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
    incubator/kafka/branches/0.8/system_test/metrics.json

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Thu Sep 13 04:27:13 2012
@@ -105,7 +105,7 @@ case class FetchRequest(versionId: Short
                         replicaId: Int = FetchRequest.DefaultReplicaId,
                         maxWait: Int = FetchRequest.DefaultMaxWait,
                         minBytes: Int = FetchRequest.DefaultMinBytes,
-                        offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
+                        offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
 
   // ensure that a topic "X" appears in at most one OffsetDetail
   def validate() {
@@ -144,6 +144,8 @@ case class FetchRequest(versionId: Short
   def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
 
   def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
+
+  def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId
 }
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Thu Sep 13 04:27:13 2012
@@ -94,7 +94,7 @@ case class LeaderAndIsrRequest (versionI
                                 isInit: Boolean,
                                 ackTimeoutMs: Int,
                                 leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
-        extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+        extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
   def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
     this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Thu Sep 13 04:27:13 2012
@@ -45,7 +45,7 @@ case class OffsetRequest(versionId: Shor
                     topic: String,
                     partition: Int,
                     time: Long,
-                    maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) {
+                    maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
   def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
     this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ import kafka.utils._
 
 object ProducerRequest {
   val CurrentVersion: Short = 0
-
+  
   def readFrom(buffer: ByteBuffer): ProducerRequest = {
     val versionId: Short = buffer.getShort
     val correlationId: Int = buffer.getInt
@@ -58,7 +58,7 @@ case class ProducerRequest( versionId: S
                             clientId: String,
                             requiredAcks: Short,
                             ackTimeoutMs: Int,
-                            data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+                            data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 
   def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
     this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Thu Sep 13 04:27:13 2012
@@ -17,11 +17,36 @@
 
 package kafka.api
 
+import kafka.common.KafkaException
+import java.nio.ByteBuffer
+
 object RequestKeys {
-  val Produce: Short = 0
-  val Fetch: Short = 1
-  val Offsets: Short = 2
-  val TopicMetadata: Short = 3
-  val LeaderAndISRRequest: Short = 4
-  val StopReplicaRequest: Short = 5
+  val ProduceKey: Short = 0
+  val FetchKey: Short = 1
+  val OffsetsKey: Short = 2
+  val MetadataKey: Short = 3
+  val LeaderAndIsrKey: Short = 4
+  val StopReplicaKey: Short = 5
+
+  val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
+    Map( ProduceKey -> ("Produce", ProducerRequest.readFrom),
+         FetchKey -> ("Fetch", FetchRequest.readFrom),
+         OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
+         MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
+         LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
+         StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) )
+
+  def nameForKey(key: Short): String = {
+    keyToNameAndDeserializerMap.get(key) match {
+      case Some(nameAndSerializer) => nameAndSerializer._1
+      case None => throw new KafkaException("Wrong request type %d".format(key))
+    }
+  }
+
+  def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
+    keyToNameAndDeserializerMap.get(key) match {
+      case Some(nameAndSerializer) => nameAndSerializer._2
+      case None => throw new KafkaException("Wrong request type %d".format(key))
+    }
+  }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Thu Sep 13 04:27:13 2012
@@ -45,7 +45,7 @@ case class StopReplicaRequest(versionId:
                               clientId: String,
                               ackTimeoutMs: Int,
                               stopReplicaSet: Set[(String, Int)])
-        extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+        extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
   def this(stopReplicaSet: Set[(String, Int)]) = {
     this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Thu Sep 13 04:27:13 2012
@@ -78,7 +78,7 @@ case class TopicMetadataRequest(val vers
                                 val topics: Seq[String],
                                 val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
                                 val timestamp: Option[Long] = None, val count: Option[Int] = None)
- extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
 
 def this(topics: Seq[String]) =
   this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Sep 13 04:27:13 2012
@@ -22,6 +22,8 @@ import java.lang.Object
 import kafka.api.LeaderAndIsr
 import kafka.server.ReplicaManager
 import kafka.common.ErrorMapping
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
 
 /**
  * Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -29,7 +31,7 @@ import kafka.common.ErrorMapping
 class Partition(val topic: String,
                 val partitionId: Int,
                 time: Time,
-                val replicaManager: ReplicaManager) extends Logging {
+                val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
   private val localBrokerId = replicaManager.config.brokerId
   private val logManager = replicaManager.logManager
   private val replicaFetcherManager = replicaManager.replicaFetcherManager
@@ -45,6 +47,20 @@ class Partition(val topic: String,
 
   private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
 
+  newGauge(
+    topic + "-" + partitionId + "UnderReplicated",
+    new Gauge[Int] {
+      def value() = {
+        if (isUnderReplicated) 1 else 0
+      }
+    }
+  )
+
+  def isUnderReplicated(): Boolean = {
+    // TODO: need to pass in replication factor from controller
+    inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
+  }
+
   def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
     val replicaOpt = getReplica(replicaId)
     replicaOpt match {
@@ -182,6 +198,7 @@ class Partition(val topic: String,
             info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
             // update ISR in ZK and cache
             updateISR(newInSyncReplicas)
+            replicaManager.isrExpandRate.mark()
           }
           maybeIncrementLeaderHW(leaderReplica)
         case None => // nothing to do if no longer leader
@@ -240,6 +257,7 @@ class Partition(val topic: String,
             updateISR(newInSyncReplicas)
             // we may need to increment high watermark since ISR could be down to 1
             maybeIncrementLeaderHW(leaderReplica)
+            replicaManager.isrShrinkRate.mark()
           }
         case None => // do nothing if no longer leader
       }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Thu Sep 13 04:27:13 2012
@@ -36,7 +36,7 @@ class Replica(val brokerId: Int,
   val topic = partition.topic
   val partitionId = partition.partitionId
 
-  def logEndOffset_=(newLogEndOffset: Long) = {
+  def logEndOffset_=(newLogEndOffset: Long) {
     if (!isLocal) {
       logEndOffsetValue.set(newLogEndOffset)
       logEndOffsetUpdateTimeMsValue.set(time.milliseconds)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Thu Sep 13 04:27:13 2012
@@ -18,7 +18,7 @@
 package kafka.consumer
 
 import scala.collection._
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
 import kafka.serializer.{DefaultDecoder, Decoder}
 
 /**
@@ -64,8 +64,6 @@ trait ConsumerConnector {
 }
 
 object Consumer extends Logging {
-  private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
-
   /**
    *  Create a ConsumerConnector
    *
@@ -74,7 +72,6 @@ object Consumer extends Logging {
    */
   def create(config: ConsumerConfig): ConsumerConnector = {
     val consumerConnect = new ZookeeperConsumerConnector(config)
-    Utils.registerMBean(consumerConnect, consumerStatsMBeanName)
     consumerConnect
   }
 
@@ -86,7 +83,6 @@ object Consumer extends Logging {
    */
   def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
     val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
-    Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName)
     consumerConnect
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Sep 13 04:27:13 2012
@@ -47,8 +47,8 @@ class ConsumerIterator[T](private val ch
     currentTopicInfo.resetConsumeOffset(consumedOffset)
     val topic = currentTopicInfo.topic
     trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
-    ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
-    ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
+    ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
+    ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
     item
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Thu Sep 13 04:27:13 2012
@@ -17,44 +17,24 @@
 
 package kafka.consumer
 
-import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, Utils, threadsafe, Logging}
-
-trait ConsumerTopicStatMBean {
-  def getMessagesPerTopic: Long
-  def getBytesPerTopic: Long
-}
+import kafka.utils.{Pool, threadsafe, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
 
 @threadsafe
-class ConsumerTopicStat extends ConsumerTopicStatMBean {
-  private val numCumulatedMessagesPerTopic = new AtomicLong(0)
-  private val numCumulatedBytesPerTopic = new AtomicLong(0)
-
-  def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
-
-  def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
-
-  def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
-
-  def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
+class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
+  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
 object ConsumerTopicStat extends Logging {
-  private val stats = new Pool[String, ConsumerTopicStat]
-  private val allTopicStat = new ConsumerTopicStat
-  Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
+  private val valueFactory = (k: String) => new ConsumerTopicStat(k)
+  private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
+  private val allTopicStat = new ConsumerTopicStat("AllTopics")
 
   def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
 
   def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
-    var stat = stats.get(topic)
-    if (stat == null) {
-      stat = new ConsumerTopicStat
-      if (stats.putIfNotExists(topic, stat) == null)
-        Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic)
-      else
-        stat = stats.get(topic)
-    }
-    return stat
+    stats.getAndMaybePut(topic + "-")
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Thu Sep 13 04:27:13 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
 import java.util.concurrent.atomic._
 import kafka.message._
 import kafka.utils.Logging
-import kafka.common.ErrorMapping
 
 private[consumer] class PartitionTopicInfo(val topic: String,
                                            val brokerId: Int,
@@ -59,8 +58,8 @@ private[consumer] class PartitionTopicIn
       chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
       val newOffset = fetchedOffset.addAndGet(size)
       debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
-      ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
-      ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
+      ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
+      ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
     }
   }
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Sep 13 04:27:13 2012
@@ -21,6 +21,8 @@ import kafka.api._
 import kafka.network._
 import kafka.utils._
 import kafka.common.ErrorMapping
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 /**
  * A consumer of kafka messages
@@ -91,15 +93,13 @@ class SimpleConsumer( val host: String,
    *  @return a set of fetched messages
    */
   def fetch(request: FetchRequest): FetchResponse = {
-    val startTime = SystemTime.nanoseconds
-    val response = sendRequest(request)
+    var response: Receive = null
+    FetchRequestAndResponseStat.requestTimer.time {
+      response = sendRequest(request)
+    }
     val fetchResponse = FetchResponse.readFrom(response.buffer)
     val fetchedSize = fetchResponse.sizeInBytes
-
-    val endTime = SystemTime.nanoseconds
-    SimpleConsumerStats.recordFetchRequest(endTime - startTime)
-    SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
-
+    FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
     fetchResponse
   }
 
@@ -125,39 +125,7 @@ class SimpleConsumer( val host: String,
   }
 }
 
-trait SimpleConsumerStatsMBean {
-  def getFetchRequestsPerSecond: Double
-  def getAvgFetchRequestMs: Double
-  def getMaxFetchRequestMs: Double
-  def getNumFetchRequests: Long  
-  def getConsumerThroughput: Double
-}
-
-@threadsafe
-class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
-  private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
-
-  def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
-
-  def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data)
-
-  def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond
-
-  def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0)
-
-  def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0)
-
-  def getNumFetchRequests: Long = fetchRequestStats.getNumRequests
-
-  def getConsumerThroughput: Double = fetchRequestStats.getThroughput
-}
-
-object SimpleConsumerStats extends Logging {
-  private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
-  private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
-  Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
-
-  def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
-  def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)
-}
-
+object FetchRequestAndResponseStat extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val respondSizeHist = newHistogram("FetchResponseSize")
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Sep 13 04:27:13 2012
@@ -32,6 +32,8 @@ import java.util.UUID
 import kafka.serializer.Decoder
 import kafka.utils.ZkUtils._
 import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
 
 
 /**
@@ -73,21 +75,9 @@ private[kafka] object ZookeeperConsumerC
   val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
 }
 
-/**
- *  JMX interface for monitoring consumer
- */
-trait ZookeeperConsumerConnectorMBean {
-  def getPartOwnerStats: String
-  def getConsumerGroup: String
-  def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
-  def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
-  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
-}
-
 private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
                                                 val enableFetcher: Boolean) // for testing only
-        extends ConsumerConnector with ZookeeperConsumerConnectorMBean
-        with Logging {
+        extends ConsumerConnector with Logging with KafkaMetricsGroup {
   private val isShuttingDown = new AtomicBoolean(false)
   private val rebalanceLock = new Object
   private var fetcher: Option[ConsumerFetcherManager] = None
@@ -260,58 +250,6 @@ private[kafka] class ZookeeperConsumerCo
     }
   }
 
-  // for JMX
-  def getPartOwnerStats(): String = {
-    val builder = new StringBuilder
-    for ((topic, infos) <- topicRegistry) {
-      builder.append("\n" + topic + ": [")
-      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      for(partition <- infos.values) {
-        builder.append("\n    {")
-        builder.append{partition}
-        builder.append(",fetch offset:" + partition.getFetchOffset)
-        builder.append(",consumer offset:" + partition.getConsumeOffset)
-        builder.append("}")
-      }
-      builder.append("\n        ]")
-    }
-    builder.toString
-  }
-
-  // for JMX
-  def getConsumerGroup(): String = config.groupId
-
-  def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
-    getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
-
-  def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
-    val partitionInfos = topicRegistry.get(topic)
-    if (partitionInfos != null) {
-      val partitionInfo = partitionInfos.get(partitionId)
-      if (partitionInfo != null)
-        return partitionInfo.getConsumeOffset
-    }
-
-    // otherwise, try to get it from zookeeper
-    try {
-      val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
-      val znode = topicDirs.consumerOffsetDir + "/" + partitionId
-      val offsetString = readDataMaybeNull(zkClient, znode)._1
-      offsetString match {
-        case Some(offset) => offset.toLong
-        case None => -1L
-      }
-    }
-    catch {
-      case e =>
-        error("error in getConsumedOffset JMX ", e)
-        -2L
-    }
-  }
-
-  def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
-    earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
-
   private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
     var simpleConsumer: SimpleConsumer = null
     var producedOffset: Long = -1L
@@ -728,6 +666,12 @@ private[kafka] class ZookeeperConsumerCo
       val topicThreadId = e._1
       val q = e._2._1
       topicThreadIdAndQueues.put(topicThreadId, q)
+      newGauge(
+        config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+        new Gauge[Int] {
+          def value() = q.size
+        }
+      )
     })
 
     val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ class ProducerRequest(val correlationId:
                       val clientId: String,
                       val requiredAcks: Short,
                       val ackTimeoutMs: Int,
-                      val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+                      val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
 	
   val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Sep 13 04:27:13 2012
@@ -26,6 +26,8 @@ import java.text.NumberFormat
 import kafka.server.BrokerTopicStat
 import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
 import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 object Log {
   val FileSuffix = ".kafka"
@@ -130,7 +132,7 @@ class LogSegment(val file: File, val mes
 @threadsafe
 private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
                           val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
-                          brokerId: Int = 0) extends Logging {
+                          brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
   this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
 
   import kafka.log.Log._
@@ -147,9 +149,19 @@ private[kafka] class Log( val dir: File,
   /* The actual segments of the log */
   private[log] val segments: SegmentList[LogSegment] = loadSegments()
 
-  private val logStats = new LogStats(this)
-
-  Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
+  newGauge(
+    name + "-" + "NumLogSegments",
+    new Gauge[Int] {
+      def value() = numberOfSegments
+    }
+  )
+
+  newGauge(
+    name + "-" + "LogEndOffset",
+    new Gauge[Long] {
+      def value() = logEndOffset
+    }
+  )
 
   /* The name of this log */
   def name  = dir.getName()
@@ -243,9 +255,8 @@ private[kafka] class Log( val dir: File,
       numberOfMessages += 1;
     }
 
-    BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
-    BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
-    logStats.recordAppendedMessages(numberOfMessages)
+    BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(numberOfMessages)
+    BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(numberOfMessages)
 
     // truncate the message set's buffer upto validbytes, before appending it to the on-disk log
     val validByteBuffer = messages.buffer.duplicate()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Thu Sep 13 04:27:13 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic._
 
 import kafka.utils._
 import kafka.common.KafkaException
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 /**
  * An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
@@ -157,10 +159,9 @@ class FileMessageSet private[kafka](priv
    */
   def flush() = {
     checkMutable()
-    val startTime = SystemTime.milliseconds
-    channel.force(true)
-    val elapsedTime = SystemTime.milliseconds - startTime
-    LogFlushStats.recordFlushRequest(elapsedTime)
+    LogFlushStats.logFlushTimer.time {
+      channel.force(true)
+    }
   }
   
   /**
@@ -238,38 +239,8 @@ class FileMessageSet private[kafka](priv
     else
       next
   }
-  
-}
-
-trait LogFlushStatsMBean {
-  def getFlushesPerSecond: Double
-  def getAvgFlushMs: Double
-  def getTotalFlushMs: Long
-  def getMaxFlushMs: Double
-  def getNumFlushes: Long
 }
 
-@threadsafe
-class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
-  private val flushRequestStats = new SnapshotStats(monitorDurationNs)
-
-  def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
-
-  def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
-
-  def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
-
-  def getTotalFlushMs: Long = flushRequestStats.getTotalMetric
-
-  def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
-
-  def getNumFlushes: Long = flushRequestStats.getNumRequests
-}
-
-object LogFlushStats extends Logging {
-  private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
-  private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
-  Utils.registerMBean(stats, LogFlushStatsMBeanName)
-
-  def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
+object LogFlushStats extends KafkaMetricsGroup {
+  val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Thu Sep 13 04:27:13 2012
@@ -27,38 +27,14 @@ import com.yammer.metrics.Metrics
 trait KafkaMetricsGroup extends Logging {
 
   /**
-   * This method enables the user to form logical sub-groups of this
-   * KafkaMetricsGroup by inserting a sub-group identifier in the package
-   * string.
-   *
-   * @return The sub-group identifier.
-   */
-  def metricsGroupIdent: String = ""
-
-  /**
    * Creates a new MetricName object for gauges, meters, etc. created for this
-   * metrics group. It uses the metricsGroupIdent to create logical sub-groups.
-   * This is currently specifically of use to classes under kafka, with
-   * broker-id being the most common metrics grouping strategy.
-   *
+   * metrics group.
    * @param name Descriptive name of the metric.
    * @return Sanitized metric name object.
    */
   private def metricName(name: String) = {
-    val ident = metricsGroupIdent
     val klass = this.getClass
-    val pkg = {
-      val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName
-      if (ident.nonEmpty) {
-        // insert the sub-group identifier after the top-level package
-        if (actualPkg.contains("."))
-          actualPkg.replaceFirst("""\.""", ".%s.".format(ident))
-        else
-          actualPkg + "." + ident
-      }
-      else
-        actualPkg
-    }
+    val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
     val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
     new MetricName(pkg, simpleName, name)
   }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Thu Sep 13 04:27:13 2012
@@ -19,23 +19,78 @@ package kafka.network
 
 import java.util.concurrent._
 import kafka.utils.SystemTime
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.nio.ByteBuffer
+import kafka.api._
+
+object RequestChannel {
+  val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+
+  def getShutdownReceive() = {
+    val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]())
+    val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
+    byteBuffer.putShort(RequestKeys.ProduceKey)
+    emptyProducerRequest.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    byteBuffer
+  }
+
+  case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) {
+    var dequeueTimeNs = -1L
+    var apiLocalCompleteTimeNs = -1L
+    var responseCompleteTimeNs = -1L
+    val requestId = buffer.getShort()
+    val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
+    buffer.rewind()
+
+    def updateRequestMetrics() {
+      val endTimeNs = SystemTime.nanoseconds
+      val queueTime = (dequeueTimeNs - startTimeNs).max(0L)
+      val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L)
+      val apiRemoteTime = (responseCompleteTimeNs - apiLocalCompleteTimeNs).max(0L)
+      val responseSendTime = (endTimeNs - responseCompleteTimeNs).max(0L)
+      val totalTime = endTimeNs - startTimeNs
+      var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
+      if (requestId == RequestKeys.FetchKey) {
+        val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
+        metricsList ::= ( if (isFromFollower)
+                            RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
+                          else
+                            RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
+      }
+      metricsList.foreach{
+        m => m.requestRate.mark()
+             m.queueTimeHist.update(queueTime)
+             m.localTimeHist.update(apiLocalTime)
+             m.remoteTimeHist.update(apiRemoteTime)
+             m.responseSendTimeHist.update(responseSendTime)
+             m.totalTimeHist.update(totalTime)
+      }
+    }
+  }
+  
+  case class Response(processor: Int, request: Request, responseSend: Send) {
+    request.responseCompleteTimeNs = SystemTime.nanoseconds
 
-object RequestChannel { 
-  val AllDone = new Request(1, 2, null, 0)
-  case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
-  case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) {
     def this(request: Request, send: Send) =
-      this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start)
+      this(request.processor, request, send)
   }
 }
 
-class RequestChannel(val numProcessors: Int, val queueSize: Int) { 
+class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
   private var responseListeners: List[(Int) => Unit] = Nil
   private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
   private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
   for(i <- 0 until numProcessors)
     responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize)
-    
+
+  newGauge(
+    "RequestQueueSize",
+    new Gauge[Int] {
+      def value() = requestQueue.size
+    }
+  )
 
   /** Send a request to be handled, potentially blocking until there is room in the queue for the request */
   def sendRequest(request: RequestChannel.Request) {
@@ -60,5 +115,26 @@ class RequestChannel(val numProcessors: 
   def addResponseListener(onResponse: Int => Unit) { 
     responseListeners ::= onResponse
   }
+}
+
+object RequestMetrics {
+  val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+  val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
+  val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+  (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
+    ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+}
 
+class RequestMetrics(name: String) extends KafkaMetricsGroup {
+  val requestRate = newMeter(name + "-RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  // time a request spent in a request queue
+  val queueTimeHist = newHistogram(name + "-QueueTimeNs")
+  // time a request takes to be processed at the local broker
+  val localTimeHist = newHistogram(name + "-LocalTimeNs")
+  // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
+  val remoteTimeHist = newHistogram(name + "-RemoteTimeNs")
+  // time to send the response to the requester
+  val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeNs")
+  val totalTimeHist = newHistogram(name + "-TotalTimeNs")
 }
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Thu Sep 13 04:27:13 2012
@@ -41,7 +41,6 @@ class SocketServer(val brokerId: Int,
   private val time = SystemTime
   private val processors = new Array[Processor](numProcessorThreads)
   private var acceptor: Acceptor = new Acceptor(port, processors)
-  val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
   val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
 
   /**
@@ -49,7 +48,7 @@ class SocketServer(val brokerId: Int,
    */
   def startup() {
     for(i <- 0 until numProcessorThreads) {
-      processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
+      processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
       Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
     }
     // register the processor threads for notification of responses
@@ -187,8 +186,7 @@ private[kafka] class Acceptor(val port: 
 private[kafka] class Processor(val id: Int,
                                val time: Time, 
                                val maxRequestSize: Int,
-                               val requestChannel: RequestChannel,
-                               val stats: SocketServerStats) extends AbstractServerThread {
+                               val requestChannel: RequestChannel) extends AbstractServerThread {
   
   private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
 
@@ -240,10 +238,10 @@ private[kafka] class Processor(val id: I
     var curr = requestChannel.receiveResponse(id)
     while(curr != null) {
       trace("Socket server received response to send, registering for write: " + curr)
-      val key = curr.requestKey.asInstanceOf[SelectionKey]
+      val key = curr.request.requestKey.asInstanceOf[SelectionKey]
       try {
         key.interestOps(SelectionKey.OP_WRITE)
-        key.attach(curr.response)
+        key.attach(curr)
       } catch {
         case e: CancelledKeyException => {
           debug("Ignoring response for closed socket.")
@@ -288,18 +286,17 @@ private[kafka] class Processor(val id: I
    */
   def read(key: SelectionKey) {
     val socketChannel = channelFor(key)
-    var request = key.attachment.asInstanceOf[Receive]
+    var receive = key.attachment.asInstanceOf[Receive]
     if(key.attachment == null) {
-      request = new BoundedByteBufferReceive(maxRequestSize)
-      key.attach(request)
+      receive = new BoundedByteBufferReceive(maxRequestSize)
+      key.attach(receive)
     }
-    val read = request.readFrom(socketChannel)
-    stats.recordBytesRead(read)
+    val read = receive.readFrom(socketChannel)
     trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
     if(read < 0) {
       close(key)
-    } else if(request.complete) {
-      val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds)
+    } else if(receive.complete) {
+      val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds)
       requestChannel.sendRequest(req)
       trace("Recieved request, sending for processing by handler: " + req)
       key.attach(null)
@@ -315,13 +312,14 @@ private[kafka] class Processor(val id: I
    */
   def write(key: SelectionKey) {
     val socketChannel = channelFor(key)
-    var response = key.attachment().asInstanceOf[Send]
-    if(response == null)
+    val response = key.attachment().asInstanceOf[RequestChannel.Response]
+    val responseSend = response.responseSend
+    if(responseSend == null)
       throw new IllegalStateException("Registered for write interest but no response attached to key.")
-    val written = response.writeTo(socketChannel)
-    stats.recordBytesWritten(written)
+    val written = responseSend.writeTo(socketChannel)
     trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
-    if(response.complete) {
+    if(responseSend.complete) {
+      response.request.updateRequestMetrics()
       key.attach(null)
       key.interestOps(SelectionKey.OP_READ)
     } else {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Thu Sep 13 04:27:13 2012
@@ -20,8 +20,9 @@ import async.{AsyncProducerStats, Defaul
 import kafka.utils._
 import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
 import kafka.serializer.Encoder
-import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
+import java.util.concurrent.atomic.AtomicBoolean
 import kafka.common.{QueueFullException, InvalidConfigException}
+import kafka.metrics.KafkaMetricsGroup
 
 class Producer[K,V](config: ProducerConfig,
                     private val eventHandler: EventHandler[K,V]) // for testing only
@@ -68,8 +69,10 @@ extends Logging {
   }
 
   private def recordStats(producerData: ProducerData[K,V]*) {
-    for (data <- producerData)
-      ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size)
+    for (data <- producerData) {
+      ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
+      ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
+    }
   }
 
   private def asyncSend(producerData: ProducerData[K,V]*) {
@@ -93,7 +96,7 @@ extends Logging {
           }
       }
       if(!added) {
-        AsyncProducerStats.recordDroppedEvents
+        AsyncProducerStats.droppedMessageRate.mark()
         error("Event queue is full of unsent messages, could not send event: " + data.toString)
         throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
       }else {
@@ -118,31 +121,27 @@ extends Logging {
   }
 }
 
-trait ProducerTopicStatMBean {
-  def getMessagesPerTopic: Long
-}
-
 @threadsafe
-class ProducerTopicStat extends ProducerTopicStatMBean {
-  private val numCumulatedMessagesPerTopic = new AtomicLong(0)
-
-  def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
-
-  def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
+  val messageRate = newMeter(name + "MessagesPerSec",  "messages", TimeUnit.SECONDS)
+  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
 }
 
-object ProducerTopicStat extends Logging {
-  private val stats = new Pool[String, ProducerTopicStat]
+object ProducerTopicStat {
+  private val valueFactory = (k: String) => new ProducerTopicStat(k)
+  private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
+  private val allTopicStat = new ProducerTopicStat("AllTopics")
+
+  def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
 
   def getProducerTopicStat(topic: String): ProducerTopicStat = {
-    var stat = stats.get(topic)
-    if (stat == null) {
-      stat = new ProducerTopicStat
-      if (stats.putIfNotExists(topic, stat) == null)
-        Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic)
-      else
-        stat = stats.get(topic)
-    }
-    return stat
+    stats.getAndMaybePut(topic + "-")
   }
 }
+
+object ProducerStats extends KafkaMetricsGroup {
+  val serializationErrorRate = newMeter("SerializationErrorsPerSec",  "errors", TimeUnit.SECONDS)
+  val resendRate = newMeter( "ResendsPerSec",  "resends", TimeUnit.SECONDS)
+  val failedSendRate = newMeter("FailedSendsPerSec",  "failed sends", TimeUnit.SECONDS)
+}
+

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Sep 13 04:27:13 2012
@@ -18,11 +18,12 @@
 package kafka.producer
 
 import kafka.api._
-import kafka.message.MessageSet
 import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
 import kafka.utils._
 import java.util.Random
-import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
+import kafka.common.ErrorMapping
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 object SyncProducer {
   val RequestKey: Short = 0
@@ -57,7 +58,7 @@ class SyncProducer(val config: SyncProdu
       val buffer = new BoundedByteBufferSend(request).buffer
       trace("verifying sendbuffer of size " + buffer.limit)
       val requestTypeId = buffer.getShort()
-      if(requestTypeId == RequestKeys.Produce) {
+      if(requestTypeId == RequestKeys.ProduceKey) {
         val request = ProducerRequest.readFrom(buffer)
         trace(request.toString)
       }
@@ -92,7 +93,6 @@ class SyncProducer(val config: SyncProdu
         sentOnConnection = 0
         lastConnectionTime = System.currentTimeMillis
       }
-      SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
       response
     }
   }
@@ -101,7 +101,11 @@ class SyncProducer(val config: SyncProdu
    * Send a message
    */
   def send(producerRequest: ProducerRequest): ProducerResponse = {
-    val response = doSend(producerRequest)
+    ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+    var response: Receive = null
+    ProducerRequestStat.requestTimer.time {
+      response = doSend(producerRequest)
+    }
     ProducerResponse.readFrom(response.buffer)
   }
 
@@ -171,34 +175,7 @@ class SyncProducer(val config: SyncProdu
   }
 }
 
-trait SyncProducerStatsMBean {
-  def getProduceRequestsPerSecond: Double
-  def getAvgProduceRequestMs: Double
-  def getMaxProduceRequestMs: Double
-  def getNumProduceRequests: Long
-}
-
-@threadsafe
-class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
-  private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
-
-  def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
-
-  def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond
-
-  def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0)
-
-  def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0)
-
-  def getNumProduceRequests: Long = produceRequestStats.getNumRequests
-}
-
-object SyncProducerStats extends Logging {
-  private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
-  private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
-  swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
-
-  def recordProduceRequest(requestMs: Long) = {
-    stats.recordProduceRequest(requestMs)
-  }
-}
+object ProducerRequestStat extends KafkaMetricsGroup {
+  val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+  val requestSizeHist = newHistogram("ProducerRequestSize")
+}
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Thu Sep 13 04:27:13 2012
@@ -17,22 +17,9 @@
 
 package kafka.producer.async
 
-import java.util.concurrent.atomic.AtomicInteger
-import kafka.utils.Utils
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
 
-class AsyncProducerStats extends AsyncProducerStatsMBean {
-  val droppedEvents = new AtomicInteger(0)
-
-  def getAsyncProducerDroppedEvents: Int = droppedEvents.get
-
-  def recordDroppedEvents = droppedEvents.getAndAdd(1)
-}
-
-object AsyncProducerStats {
-  private val stats = new AsyncProducerStats
-  val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
-
-  Utils.registerMBean(stats, ProducerMBeanName)
-
-  def recordDroppedEvents = stats.recordDroppedEvents
+object AsyncProducerStats extends KafkaMetricsGroup {
+  val droppedMessageRate = newMeter("DroppedMessagesPerSec",  "drops", TimeUnit.SECONDS)
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ import kafka.serializer.Encoder
 import kafka.utils.{Utils, Logging}
 import scala.collection.Map
 import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData}
+import kafka.api._
 
 
 class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -33,6 +33,7 @@ class DefaultEventHandler[K,V](config: P
                                private val producerPool: ProducerPool,
                                private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
   extends EventHandler[K,V] with Logging {
+  val isSync = ("sync" == config.producerType)
 
   val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
 
@@ -41,6 +42,11 @@ class DefaultEventHandler[K,V](config: P
   def handle(events: Seq[ProducerData[K,V]]) {
     lock synchronized {
       val serializedData = serialize(events)
+      serializedData.foreach{
+        pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
+              ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
+              ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+      }
       var outstandingProduceRequests = serializedData
       var remainingRetries = config.producerRetries + 1
       while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
@@ -51,9 +57,11 @@ class DefaultEventHandler[K,V](config: P
           // get topics of the outstanding produce requests and refresh metadata for those
           Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
           remainingRetries -= 1
+          ProducerStats.resendRate.mark()
         }
       }
       if(outstandingProduceRequests.size > 0) {
+        ProducerStats.failedSendRate.mark()
         error("Failed to send the following requests: " + outstandingProduceRequests)
         throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
       }
@@ -90,7 +98,28 @@ class DefaultEventHandler[K,V](config: P
   }
 
   def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
-    events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
+    val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
+    events.foreach {e =>
+      val serializedMessages = new ListBuffer[Message]
+      for (d <- e.getData) {
+        try {
+          serializedMessages += encoder.toMessage(d)
+        } catch {
+          case t =>
+            ProducerStats.serializationErrorRate.mark()
+            if (isSync)
+              throw t
+            else {
+              // currently, if in async mode, we just log the serialization error. We need to revisit
+              // this when doing kafka-496
+              error("Error serializing message " + t)
+            }
+        }
+      }
+      if (serializedMessages.size > 0)
+        serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
+    }
+    serializedProducerData
   }
 
   def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu Sep 13 04:27:13 2012
@@ -21,16 +21,25 @@ import kafka.utils.{SystemTime, Logging}
 import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
 import collection.mutable.ListBuffer
 import kafka.producer.ProducerData
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
 
 class ProducerSendThread[K,V](val threadName: String,
                               val queue: BlockingQueue[ProducerData[K,V]],
                               val handler: EventHandler[K,V],
                               val queueTime: Long,
-                              val batchSize: Int) extends Thread(threadName) with Logging {
+                              val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
 
   private val shutdownLatch = new CountDownLatch(1)
   private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
 
+  newGauge(
+    "ProducerQueueSize-" + getId,
+    new Gauge[Int] {
+      def value() = queue.size
+    }
+  )
+
   override def run {
 
     try {

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Thu Sep 13 04:27:13 2012
@@ -23,7 +23,11 @@ import kafka.common.ErrorMapping
 import collection.mutable
 import kafka.message.ByteBufferMessageSet
 import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
-import kafka.utils.ShutdownableThread
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicLong
+import kafka.utils.{Pool, ShutdownableThread}
+import java.util.concurrent.TimeUnit
 
 
 /**
@@ -35,6 +39,8 @@ abstract class  AbstractFetcherThread(na
   private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
   private val fetchMapLock = new Object
   val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
+  val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+  
   // callbacks to be defined in subclass
 
   // process fetched data
@@ -79,6 +85,7 @@ abstract class  AbstractFetcherThread(na
           }
         }
     }
+    fetcherMetrics.requestRate.mark()
 
     if (response != null) {
       // process fetched data
@@ -93,8 +100,11 @@ abstract class  AbstractFetcherThread(na
               partitionData.error match {
                 case ErrorMapping.NoError =>
                   processPartitionData(topic, currentOffset.get, partitionData)
-                  val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                  val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+                  val newOffset = currentOffset.get + validBytes
                   fetchMap.put(key, newOffset)
+                  FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+                  fetcherMetrics.byteRate.mark(validBytes)
                 case ErrorMapping.OffsetOutOfRangeCode =>
                   val newOffset = handleOffsetOutOfRange(topic, partitionId)
                   fetchMap.put(key, newOffset)
@@ -140,4 +150,43 @@ abstract class  AbstractFetcherThread(na
       fetchMap.size
     }
   }
-}
\ No newline at end of file
+}
+
+class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
+  private[this] var lagVal = new AtomicLong(-1L)
+  newGauge(
+    name._1 + "-" + name._2 + "-ConsumerLag",
+    new Gauge[Long] {
+      def value() = lagVal.get
+    }
+  )
+
+  def lag_=(newLag: Long) {
+    lagVal.set(newLag)
+  }
+
+  def lag = lagVal.get
+}
+
+object FetcherLagMetrics {
+  private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
+  private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
+
+  def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
+    stats.getAndMaybePut( (topic, partitionId) )
+  }
+}
+
+class FetcherStat(name: String) extends KafkaMetricsGroup {
+  val requestRate = newMeter(name + "RequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val byteRate = newMeter(name + "BytesPerSec",  "bytes", TimeUnit.SECONDS)
+}
+
+object FetcherStat {
+  private val valueFactory = (k: String) => new FetcherStat(k)
+  private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+
+  def getFetcherStat(name: String): FetcherStat = {
+    stats.getAndMaybePut(name)
+  }
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 13 04:27:13 2012
@@ -40,8 +40,6 @@ class KafkaApis(val requestChannel: Requ
                 val replicaManager: ReplicaManager,
                 val zkClient: ZkClient,
                 brokerId: Int) extends Logging {
-
-  private val metricsGroup = brokerId.toString
   private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
   private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
   private val delayedRequestMetrics = new DelayedRequestMetrics
@@ -54,20 +52,20 @@ class KafkaApis(val requestChannel: Requ
    * Top-level method that handles all requests and multiplexes to the right api
    */
   def handle(request: RequestChannel.Request) {
-    val apiId = request.request.buffer.getShort()
-    apiId match {
-      case RequestKeys.Produce => handleProducerRequest(request)
-      case RequestKeys.Fetch => handleFetchRequest(request)
-      case RequestKeys.Offsets => handleOffsetRequest(request)
-      case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
-      case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
-      case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
-      case _ => throw new KafkaException("No mapping found for handler id " + apiId)
+    request.requestId match {
+      case RequestKeys.ProduceKey => handleProducerRequest(request)
+      case RequestKeys.FetchKey => handleFetchRequest(request)
+      case RequestKeys.OffsetsKey => handleOffsetRequest(request)
+      case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
+      case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request)
+      case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+      case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
     }
+    request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
   }
 
   def handleLeaderAndISRRequest(request: RequestChannel.Request){
-    val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer)
+    val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
     trace("Handling leader and isr request " + leaderAndISRRequest)
@@ -79,7 +77,7 @@ class KafkaApis(val requestChannel: Requ
 
 
   def handleStopReplicaRequest(request: RequestChannel.Request){
-    val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
+    val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
     trace("Handling stop replica request " + stopReplicaRequest)
@@ -107,11 +105,6 @@ class KafkaApis(val requestChannel: Requ
     for(fetchReq <- satisfied) {
       val topicData = readMessageSets(fetchReq.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
-
-      val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
-      delayedRequestMetrics.recordDelayedFetchSatisfied(
-        fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response)
-
       requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
     }
   }
@@ -120,7 +113,7 @@ class KafkaApis(val requestChannel: Requ
    * Handle a produce request
    */
   def handleProducerRequest(request: RequestChannel.Request) {
-    val produceRequest = ProducerRequest.readFrom(request.request.buffer)
+    val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
     val sTime = SystemTime.milliseconds
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling producer request " + request.toString)
@@ -179,8 +172,8 @@ class KafkaApis(val requestChannel: Requ
     for(topicData <- request.data) {
       for(partitionData <- topicData.partitionDataArray) {
         msgIndex += 1
-        BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
-        BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
+        BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
+        BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
         try {
           val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
           val log = localReplica.log.get
@@ -193,8 +186,8 @@ class KafkaApis(val requestChannel: Requ
             .format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
         } catch {
           case e =>
-            BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
-            BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+            BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark()
+            BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
             error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
             e match {
               case _: IOException =>
@@ -214,7 +207,7 @@ class KafkaApis(val requestChannel: Requ
    * Handle a fetch request
    */
   def handleFetchRequest(request: RequestChannel.Request) {
-    val fetchRequest = FetchRequest.readFrom(request.request.buffer)
+    val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling fetch request " + fetchRequest.toString)
     trace("Handling fetch request " + fetchRequest.toString)
@@ -229,7 +222,7 @@ class KafkaApis(val requestChannel: Requ
         requestChannel.sendResponse(channelResponse)
     }
 
-    if(fetchRequest.replicaId != FetchRequest.NonFollowerId) {
+    if(fetchRequest.isFromFollower) {
       maybeUpdatePartitionHW(fetchRequest)
       // after updating HW, some delayed produce requests may be unblocked
       var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
@@ -272,7 +265,7 @@ class KafkaApis(val requestChannel: Requ
         debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
         try {
           val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
-          val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) {
+          val end = if (!fetchRequest.isFromFollower) {
             leader.highWatermark
           } else {
             leader.logEndOffset
@@ -317,12 +310,12 @@ class KafkaApis(val requestChannel: Requ
       val topic = offsetDetail.topic
       val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
       for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
-        val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId
+        val isFetchFromFollower = fetchRequest.isFromFollower()
         val partitionInfo =
           try {
             val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
-            BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
-            BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
+            BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+            BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
             if (!isFetchFromFollower) {
               new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
             } else {
@@ -335,8 +328,8 @@ class KafkaApis(val requestChannel: Requ
           }
           catch {
             case e =>
-              BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
-              BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
+              BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
+              BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
               error("error when processing request " + (topic, partition, offset, fetchSize), e)
               new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
                                 offset, -1L, MessageSet.Empty)
@@ -375,7 +368,7 @@ class KafkaApis(val requestChannel: Requ
    * Service the offset request API 
    */
   def handleOffsetRequest(request: RequestChannel.Request) {
-    val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
+    val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling offset request " + offsetRequest.toString)
     trace("Handling offset request " + offsetRequest.toString)
@@ -402,7 +395,7 @@ class KafkaApis(val requestChannel: Requ
    * Service the topic metadata request API
    */
   def handleTopicMetadataRequest(request: RequestChannel.Request) {
-    val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
+    val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
     if(requestLogger.isTraceEnabled)
       requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
     trace("Handling topic metadata request " + metadataRequest.toString())
@@ -456,7 +449,7 @@ class KafkaApis(val requestChannel: Requ
     def keyLabel: String
   }
   private [kafka] object MetricKey {
-    val globalLabel = "all"
+    val globalLabel = "All"
   }
 
   private [kafka] case class RequestKey(topic: String, partition: Int)
@@ -476,7 +469,6 @@ class KafkaApis(val requestChannel: Requ
 
     this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
 
-
     /**
      * A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
      */
@@ -489,8 +481,8 @@ class KafkaApis(val requestChannel: Requ
     def expire(delayed: DelayedFetch) {
       val topicData = readMessageSets(delayed.fetch)
       val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
-      val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId
-      delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response)
+      val fromFollower = delayed.fetch.isFromFollower
+      delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
       requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
     }
   }
@@ -560,7 +552,6 @@ class KafkaApis(val requestChannel: Requ
       val partitionId = followerFetchRequestKey.partition
       val key = RequestKey(topic, partitionId)
       val fetchPartitionStatus = partitionStatus(key)
-      val durationNs = SystemTime.nanoseconds - creationTimeNs
       trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
         .format(topic, partitionId, fetchPartitionStatus.acksPending))
       if (fetchPartitionStatus.acksPending) {
@@ -576,17 +567,12 @@ class KafkaApis(val requestChannel: Requ
         if (!fetchPartitionStatus.acksPending) {
           val topicData = produce.data.find(_.topic == topic).get
           val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
-          delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
-                                                                 durationNs,
-                                                                 partitionData.sizeInBytes)
           maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
         }
       }
 
       // unblocked if there are no partitions with pending acks
       val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
-      if (satisfied)
-        delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs)
       satisfied
     }
 
@@ -629,53 +615,18 @@ class KafkaApis(val requestChannel: Requ
 
   private class DelayedRequestMetrics {
     private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
-      val caughtUpFollowerFetchRequestMeter =
-        newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
-      val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
-        Some(newHistogram("FollowerCatchUpTimeInNs", biased = true))
-      else None
-
-      /*
-       * Note that throughput is updated on individual key satisfaction.
-       * Therefore, it is an upper bound on throughput since the
-       * DelayedProducerRequest may get expired.
-       */
-      val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS)
-      val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
-
-      val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
-        Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS))
-      else None
-      val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
-        Some(newHistogram("SatisfactionTimeInNs", biased = true))
-      else None
+      val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
     }
 
 
-    private class DelayedFetchRequestMetrics(forFollower: Boolean,
-                                             keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
-      private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
-
-      val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
-        Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
-          "requests", TimeUnit.SECONDS))
-      else None
-
-      val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
-        Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true))
-      else None
-
-      val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel)
-        Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond",
-          "requests", TimeUnit.SECONDS))
-      else None
+    private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
+      private val metricPrefix = if (forFollower) "Follower" else "Consumer"
 
-      val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel),
-        "bytes", TimeUnit.SECONDS)
+      val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
     }
 
     private val producerRequestMetricsForKey = {
-      val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel)
+      val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
       new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
     }
 
@@ -684,74 +635,16 @@ class KafkaApis(val requestChannel: Requ
     private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
     private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
 
-    private val followerFetchRequestMetricsForKey = {
-      val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel)
-      new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
-    }
-
-    private val nonFollowerFetchRequestMetricsForKey = {
-      val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel)
-      new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
-    }
-
     def recordDelayedProducerKeyExpired(key: MetricKey) {
       val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
       List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
     }
 
-
-    def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
-      val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
-      List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
-        m.caughtUpFollowerFetchRequestMeter.mark()
-        m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs))
-        m.throughputMeter.mark(bytes)
-      })
-    }
-
-
-    def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) {
-      aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark())
-      aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
-    }
-
-    private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
-      val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
-        else aggregateNonFollowerFetchRequestMetrics
-      metrics.throughputMeter.mark(response.sizeInBytes)
-
-      response.topicMap.foreach(topicAndData => {
-        val topic = topicAndData._1
-        topicAndData._2.partitionDataArray.foreach(partitionData => {
-          val key = RequestKey(topic, partitionData.partition)
-          val keyMetrics = if (forFollower)
-            followerFetchRequestMetricsForKey.getAndMaybePut(key)
-          else
-            nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
-          keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
-        })
-      })
-    }
-
-
-    def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) {
+    def recordDelayedFetchExpired(forFollower: Boolean) {
       val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
         else aggregateNonFollowerFetchRequestMetrics
       
-      metrics.expiredRequestMeter.foreach(_.mark())
-
-      recordDelayedFetchThroughput(forFollower, response)
-    }
-
-
-    def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) {
-      val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics
-        else aggregateNonFollowerFetchRequestMetrics
-
-      aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
-      aggregateMetrics.satisfiedRequestMeter.foreach(_.mark())
-
-      recordDelayedFetchThroughput(forFollower, response)
+      metrics.expiredRequestMeter.mark()
     }
   }
 }