You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/13 06:27:16 UTC

svn commit: r1384202 [2/2] - in /incubator/kafka/branches/0.8: core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/ core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/ core/src/main/scala/kafka/log/ core/src/main/scala/k...

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Thu Sep 13 04:27:13 2012
@@ -28,8 +28,10 @@ import org.apache.zookeeper.Watcher.Even
 import collection.JavaConversions._
 import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
 import java.lang.Object
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
-import kafka.common.{KafkaException, PartitionOfflineException}
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
+import kafka.common.{PartitionOfflineException, KafkaException}
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
 
 
 class RequestSendThread(val controllerId: Int,
@@ -52,9 +54,9 @@ class RequestSendThread(val controllerId
         receive = channel.receive()
         var response: RequestOrResponse = null
         request.requestId.get match {
-          case RequestKeys.LeaderAndISRRequest =>
+          case RequestKeys.LeaderAndIsrKey =>
             response = LeaderAndISRResponse.readFrom(receive.buffer)
-          case RequestKeys.StopReplicaRequest =>
+          case RequestKeys.StopReplicaKey =>
             response = StopReplicaResponse.readFrom(receive.buffer)
         }
         trace("got a response %s".format(controllerId, response, toBrokerId))
@@ -144,7 +146,7 @@ case class ControllerBrokerStateInfo(cha
                                      messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
                                      requestSendThread: RequestSendThread)
 
-class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
+class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup{
   this.logIdent = "[Controller " + config.brokerId + "], "
   private var isRunning = true
   private val controllerLock = new Object
@@ -155,6 +157,13 @@ class KafkaController(config : KafkaConf
   private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
   private var allLeaders: mutable.Map[(String, Int), Int] = null
 
+  newGauge(
+    "ActiveControllerCount",
+    new Gauge[Int] {
+      def value() = if (isActive) 1 else 0
+    }
+  )
+
   // Return true if this controller succeeds in the controller leader election
   private def tryToBecomeController(): Boolean = {
     val controllerStatus =
@@ -369,6 +378,7 @@ class KafkaController(config : KafkaConf
       }
       else{
         warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
+        ControllerStat.offlinePartitionRate.mark()
       }
     }
 
@@ -479,10 +489,13 @@ class KafkaController(config : KafkaConf
           debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
             .format(liveAssignedReplicasToThisPartition.mkString(",")))
           liveAssignedReplicasToThisPartition.isEmpty match {
-            case true => throw new PartitionOfflineException(("No replica for partition " +
-              "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
-              " Assigned replicas are: [%s]".format(assignedReplicas))
+            case true =>
+              ControllerStat.offlinePartitionRate.mark()
+              throw new PartitionOfflineException(("No replica for partition " +
+                "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
+                " Assigned replicas are: [%s]".format(assignedReplicas))
             case false =>
+              ControllerStat.uncleanLeaderElectionRate.mark()
               val newLeader = liveAssignedReplicasToThisPartition.head
               warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
                 "There's potential data loss")
@@ -509,18 +522,20 @@ class KafkaController(config : KafkaConf
   class BrokerChangeListener() extends IZkChildListener with Logging {
     this.logIdent = "[Controller " + config.brokerId + "], "
     def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
-      controllerLock synchronized {
-        val curBrokerIds = currentBrokerList.map(_.toInt).toSet
-        val newBrokerIds = curBrokerIds -- liveBrokerIds
-        val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-        val deletedBrokerIds = liveBrokerIds -- curBrokerIds
-        liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
-        liveBrokerIds = liveBrokers.map(_.id)
-        info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
-          .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
-        newBrokers.foreach(controllerChannelManager.addBroker(_))
-        deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
-        onBrokerChange(newBrokerIds)
+      ControllerStat.leaderElectionTimer.time {
+        controllerLock synchronized {
+          val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+          val newBrokerIds = curBrokerIds -- liveBrokerIds
+          val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+          val deletedBrokerIds = liveBrokerIds -- curBrokerIds
+          liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+          liveBrokerIds = liveBrokers.map(_.id)
+          info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+            .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
+          newBrokers.foreach(controllerChannelManager.addBroker(_))
+          deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
+          onBrokerChange(newBrokerIds)
+        }
       }
     }
   }
@@ -591,4 +606,10 @@ class KafkaController(config : KafkaConf
       }
     }
   }
-}
\ No newline at end of file
+}
+
+object ControllerStat extends KafkaMetricsGroup {
+  val offlinePartitionRate = newMeter("OfflinePartitionsPerSec",  "partitions", TimeUnit.SECONDS)
+  val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec",  "elections", TimeUnit.SECONDS)
+  val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Thu Sep 13 04:27:13 2012
@@ -19,7 +19,8 @@ package kafka.server
 
 import kafka.network._
 import kafka.utils._
-import java.util.concurrent.atomic.AtomicLong
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
 
 /**
  * A thread that answers kafka requests.
@@ -30,10 +31,11 @@ class KafkaRequestHandler(id: Int, broke
   def run() { 
     while(true) { 
       val req = requestChannel.receiveRequest()
-      if(req == RequestChannel.AllDone){
+      if(req eq RequestChannel.AllDone){
         trace("receives shut down command, shut down".format(brokerId, id))
         return
       }
+      req.dequeueTimeNs = SystemTime.nanoseconds
       debug("handles request " + req)
       apis.handle(req)
     }
@@ -63,62 +65,24 @@ class KafkaRequestHandlerPool(val broker
       thread.join
     info("shutted down completely")
   }
-  
 }
 
-trait BrokerTopicStatMBean {
-  def getMessagesIn: Long
-  def getBytesIn: Long
-  def getBytesOut: Long
-  def getFailedProduceRequest: Long
-  def getFailedFetchRequest: Long
+class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
+  val messagesInRate = newMeter(name + "MessagesInPerSec",  "messages", TimeUnit.SECONDS)
+  val bytesInRate = newMeter(name + "BytesInPerSec",  "bytes", TimeUnit.SECONDS)
+  val bytesOutRate = newMeter(name + "BytesOutPerSec",  "bytes", TimeUnit.SECONDS)
+  val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec",  "requests", TimeUnit.SECONDS)
+  val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec",  "requests", TimeUnit.SECONDS)
 }
 
-@threadsafe
-class BrokerTopicStat extends BrokerTopicStatMBean {
-  private val numCumulatedMessagesIn = new AtomicLong(0)
-  private val numCumulatedBytesIn = new AtomicLong(0)
-  private val numCumulatedBytesOut = new AtomicLong(0)
-  private val numCumulatedFailedProduceRequests = new AtomicLong(0)
-  private val numCumulatedFailedFetchRequests = new AtomicLong(0)
-
-  def getMessagesIn: Long = numCumulatedMessagesIn.get
-
-  def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
-
-  def getBytesIn: Long = numCumulatedBytesIn.get
-
-  def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
-
-  def getBytesOut: Long = numCumulatedBytesOut.get
-
-  def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
-
-  def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
-
-  def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
+object BrokerTopicStat extends Logging {
+  private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
+  private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
+  private val allTopicStat = new BrokerTopicMetrics("AllTopics")
 
-  def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
+  def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
 
-  def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
-}
-
-object BrokerTopicStat extends Logging {
-  private val stats = new Pool[String, BrokerTopicStat]
-  private val allTopicStat = new BrokerTopicStat
-  Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
-
-  def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
-
-  def getBrokerTopicStat(topic: String): BrokerTopicStat = {
-    var stat = stats.get(topic)
-    if (stat == null) {
-      stat = new BrokerTopicStat
-      if (stats.putIfNotExists(topic, stat) == null)
-        Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
-      else
-        stat = stats.get(topic)
-    }
-    return stat
+  def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
+    stats.getAndMaybePut(topic + "-")
   }
 }

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Sep 13 04:27:13 2012
@@ -18,7 +18,7 @@
 package kafka.server
 
 import java.io.File
-import kafka.network.{SocketServerStats, SocketServer}
+import kafka.network.SocketServer
 import kafka.log.LogManager
 import kafka.utils._
 import java.util.concurrent._
@@ -34,7 +34,6 @@ class KafkaServer(val config: KafkaConfi
   val CleanShutdownFile = ".kafka_cleanshutdown"
   private var isShuttingDown = new AtomicBoolean(false)
   private var shutdownLatch = new CountDownLatch(1)
-  private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
   var socketServer: SocketServer = null
   var requestHandlerPool: KafkaRequestHandlerPool = null
   var logManager: LogManager = null
@@ -82,8 +81,6 @@ class KafkaServer(val config: KafkaConfi
 
     socketServer.startup
 
-    Utils.registerMBean(socketServer.stats, statsMBeanName)
-
     /* start client */
     kafkaZookeeper = new KafkaZooKeeper(config)
     // starting relevant replicas and leader election for partitions assigned to this broker
@@ -123,7 +120,6 @@ class KafkaServer(val config: KafkaConfi
         replicaManager.shutdown()
       if (socketServer != null)
         socketServer.shutdown()
-      Utils.unregisterMBean(statsMBeanName)
       if(logManager != null)
         logManager.shutdown()
 
@@ -144,8 +140,6 @@ class KafkaServer(val config: KafkaConfi
   def awaitShutdown(): Unit = shutdownLatch.await()
 
   def getLogManager(): LogManager = logManager
-
-  def getStats(): SocketServerStats = socketServer.stats
 }
 
 

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Thu Sep 13 04:27:13 2012
@@ -24,13 +24,16 @@ import kafka.utils._
 import kafka.log.LogManager
 import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
 import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.TimeUnit
 
 object ReplicaManager {
   val UnknownLogEndOffset = -1L
 }
 
 class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
-                     val logManager: LogManager) extends Logging {
+                     val logManager: LogManager) extends Logging with KafkaMetricsGroup {
   private val allPartitions = new Pool[(String, Int), Partition]
   private var leaderPartitions = new mutable.HashSet[Partition]()
   private val leaderPartitionsLock = new Object
@@ -41,6 +44,26 @@ class ReplicaManager(val config: KafkaCo
   val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
   info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
 
+  newGauge(
+    "LeaderCount",
+    new Gauge[Int] {
+      def value() = leaderPartitions.size
+    }
+  )
+  newGauge(
+    "UnderReplicatedPartitions",
+    new Gauge[Int] {
+      def value() = {
+        leaderPartitionsLock synchronized {
+          leaderPartitions.count(_.isUnderReplicated)
+        }
+      }
+    }
+  )
+  val isrExpandRate = newMeter("IsrExpandsPerSec",  "expands", TimeUnit.SECONDS)
+  val isrShrinkRate = newMeter("ISRShrinksPerSec",  "shrinks", TimeUnit.SECONDS)
+
+
   def startHighWaterMarksCheckPointThread() = {
     if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
       kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Thu Sep 13 04:27:13 2012
@@ -33,7 +33,6 @@ import kafka.metrics.KafkaMetricsGroup
  * for example a key could be a (topic, partition) pair.
  */
 class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
-  val creationTimeNs = SystemTime.nanoseconds
   val satisfied = new AtomicBoolean(false)
 }
 
@@ -67,32 +66,13 @@ abstract class RequestPurgatory[T <: Del
   /* a list of requests watching each key */
   private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
 
-  private val numDelayedRequestsBeanName = "NumDelayedRequests"
-  private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs"
-  private val satisfactionRateBeanName = "SatisfactionRate"
-  private val expirationRateBeanName = "ExpirationRate"
-
-  val satisfactionRateMeter = newMeter(
-      satisfactionRateBeanName,
-      "requests",
-      TimeUnit.SECONDS
-    )
-
-  val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true)
-
   newGauge(
-    numDelayedRequestsBeanName,
+    "NumDelayedRequests",
     new Gauge[Int] {
       def value() = expiredRequestReaper.unsatisfied.get()
     }
   )
 
-  val expirationRateMeter = newMeter(
-    expirationRateBeanName,
-    "requests",
-    TimeUnit.SECONDS
-  )
-
   /* background thread expiring requests that have been waiting too long */
   private val expiredRequestReaper = new ExpiredRequestReaper
   private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
@@ -196,10 +176,6 @@ abstract class RequestPurgatory[T <: Del
               iter.remove()
               val updated = curr.satisfied.compareAndSet(false, true)
               if(updated == true) {
-                val requestNs = SystemTime.nanoseconds - curr.creationTimeNs
-                satisfactionRateMeter.mark()
-                timeToSatisfyHistogram.update(requestNs)
-
                 response += curr
                 liveCount -= 1
                 expiredRequestReaper.satisfyRequest()
@@ -282,7 +258,6 @@ abstract class RequestPurgatory[T <: Del
         val curr = delayed.take()
         val updated = curr.satisfied.compareAndSet(false, true)
         if(updated) {
-          expirationRateMeter.mark()
           unsatisfied.getAndDecrement()
           for(key <- curr.keys)
             watchersFor(key).decLiveCount()

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala Thu Sep 13 04:27:13 2012
@@ -21,12 +21,14 @@ import java.util.ArrayList
 import java.util.concurrent._
 import collection.JavaConversions
 import kafka.common.KafkaException
+import java.lang.Object
 
 
 class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
 
   private val pool = new ConcurrentHashMap[K, V]
-  
+  private val createLock = new Object
+
   def this(m: collection.Map[K, V]) {
     this()
     m.foreach(kv => pool.put(kv._1, kv._2))
@@ -52,8 +54,12 @@ class Pool[K,V](valueFactory: Option[(K)
       throw new KafkaException("Empty value factory in pool.")
     val curr = pool.get(key)
     if (curr == null) {
-      pool.putIfAbsent(key, valueFactory.get(key))
-      pool.get(key)
+      createLock synchronized {
+        val curr = pool.get(key)
+        if (curr == null)
+          pool.put(key, valueFactory.get(key))
+        pool.get(key)
+      }
     }
     else
       curr

Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Sep 13 04:27:13 2012
@@ -20,7 +20,6 @@ package kafka.utils
 import java.io._
 import java.nio._
 import java.nio.channels._
-import java.util.concurrent.atomic._
 import java.lang.management._
 import java.util.zip.CRC32
 import javax.management._
@@ -685,100 +684,6 @@ object Utils extends Logging {
       for (forever <- Stream.continually(1); t <- coll) yield t
     stream.iterator
   }
-
-}
-
-class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
-  private val time: Time = SystemTime
-
-  private val complete = new AtomicReference(new Stats())
-  private val current = new AtomicReference(new Stats())
-  private val total = new AtomicLong(0)
-  private val numCumulatedRequests = new AtomicLong(0)
-
-  def recordRequestMetric(requestNs: Long) {
-    val stats = current.get
-    stats.add(requestNs)
-    total.getAndAdd(requestNs)
-    numCumulatedRequests.getAndAdd(1)
-    val ageNs = time.nanoseconds - stats.start
-    // if the current stats are too old it is time to swap
-    if(ageNs >= monitorDurationNs) {
-      val swapped = current.compareAndSet(stats, new Stats())
-      if(swapped) {
-        complete.set(stats)
-        stats.end.set(time.nanoseconds)
-      }
-    }
-  }
-
-  def recordThroughputMetric(data: Long) {
-    val stats = current.get
-    stats.addData(data)
-    val ageNs = time.nanoseconds - stats.start
-    // if the current stats are too old it is time to swap
-    if(ageNs >= monitorDurationNs) {
-      val swapped = current.compareAndSet(stats, new Stats())
-      if(swapped) {
-        complete.set(stats)
-        stats.end.set(time.nanoseconds)
-      }
-    }
-  }
-
-  def getNumRequests(): Long = numCumulatedRequests.get
-
-  def getRequestsPerSecond: Double = {
-    val stats = complete.get
-    stats.numRequests / stats.durationSeconds
-  }
-
-  def getThroughput: Double = {
-    val stats = complete.get
-    stats.totalData / stats.durationSeconds
-  }
-
-  def getAvgMetric: Double = {
-    val stats = complete.get
-    if (stats.numRequests == 0) {
-      0
-    }
-    else {
-      stats.totalRequestMetric / stats.numRequests
-    }
-  }
-
-  def getTotalMetric: Long = total.get
-
-  def getMaxMetric: Double = complete.get.maxRequestMetric
-
-  class Stats {
-    val start = time.nanoseconds
-    var end = new AtomicLong(-1)
-    var numRequests = 0
-    var totalRequestMetric: Long = 0L
-    var maxRequestMetric: Long = 0L
-    var totalData: Long = 0L
-    private val lock = new Object()
-
-    def addData(data: Long) {
-      lock synchronized {
-        totalData += data
-      }
-    }
-
-    def add(requestNs: Long) {
-      lock synchronized {
-        numRequests +=1
-        totalRequestMetric += requestNs
-        maxRequestMetric = scala.math.max(maxRequestMetric, requestNs)
-      }
-    }
-
-    def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0)
-
-    def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
-  }
 }
 
 /**

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Thu Sep 13 04:27:13 2012
@@ -29,8 +29,7 @@ import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
 import kafka.common.ErrorMapping
-import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
-
+import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
 
 class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
   val props = createBrokerConfigs(1)
@@ -106,29 +105,20 @@ class TopicMetadataTest extends JUnit3Su
     // create a topic metadata request
     val topicMetadataRequest = new TopicMetadataRequest(List(topic))
 
-    val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
-    topicMetadataRequest.writeTo(serializedMetadataRequest)
-    serializedMetadataRequest.rewind()
+    val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest)
 
     // create the kafka request handler
     val requestChannel = new RequestChannel(2, 5)
     val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
 
-    // mock the receive API to return the request buffer as created above
-    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
-    EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest)
-    EasyMock.replay(receivedRequest)
-
     // call the API (to be tested) to get metadata
-    apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
-    val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer
+    apis.handleTopicMetadataRequest(new RequestChannel.Request
+      (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1))
+    val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
     
     // check assertions
     val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
 
-    // verify the expected calls to log manager occurred in the right order
-    EasyMock.verify(receivedRequest)
-
     topicMetadata
   }
 }
\ No newline at end of file

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu Sep 13 04:27:13 2012
@@ -24,6 +24,9 @@ import org.scalatest.junit.JUnitSuite
 import kafka.utils.TestUtils
 import java.util.Random
 import junit.framework.Assert._
+import kafka.producer.SyncProducerConfig
+import kafka.api.{TopicData, ProducerRequest}
+import java.nio.ByteBuffer
 
 class SocketServerTest extends JUnitSuite {
 
@@ -54,9 +57,9 @@ class SocketServerTest extends JUnitSuit
   /* A simple request handler that just echos back the response */
   def processRequest(channel: RequestChannel) {
     val request = channel.receiveRequest
-    val id = request.request.buffer.getShort
-    val send = new BoundedByteBufferSend(request.request.buffer.slice)
-    channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15))
+    val id = request.buffer.getShort
+    val send = new BoundedByteBufferSend(request.buffer.slice)
+    channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
   }
 
   def connect() = new Socket("localhost", server.port)
@@ -69,10 +72,21 @@ class SocketServerTest extends JUnitSuit
   @Test
   def simpleRequest() {
     val socket = connect()
-    sendRequest(socket, 0, "hello".getBytes)
+    val correlationId = SyncProducerConfig.DefaultCorrelationId
+    val clientId = SyncProducerConfig.DefaultClientId
+    val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+    val ack = SyncProducerConfig.DefaultRequiredAcks
+    val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+
+    val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes())
+    emptyRequest.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    val serializedBytes = new Array[Byte](byteBuffer.remaining)
+    byteBuffer.get(serializedBytes)
+
+    sendRequest(socket, 0, serializedBytes)
     processRequest(server.requestChannel)
-    val response = new String(receiveResponse(socket))
-    assertEquals("hello", response)
+    assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
   }
 
   @Test(expected=classOf[IOException])

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Thu Sep 13 04:27:13 2012
@@ -16,16 +16,15 @@
  */
 package kafka.server
 
-import java.nio.ByteBuffer
-import kafka.api.{FetchRequest, FetchRequestBuilder}
 import kafka.cluster.{Partition, Replica}
 import kafka.log.Log
 import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.{BoundedByteBufferReceive, RequestChannel}
+import kafka.network.RequestChannel
 import kafka.utils.{Time, TestUtils, MockTime}
 import org.easymock.EasyMock
 import org.I0Itec.zkclient.ZkClient
 import org.scalatest.junit.JUnit3Suite
+import kafka.api.{FetchRequest, FetchRequestBuilder}
 
 class SimpleFetchTest extends JUnit3Suite {
 
@@ -92,16 +91,10 @@ class SimpleFetchTest extends JUnit3Suit
       .replicaId(FetchRequest.NonFollowerId)
       .addFetch(topic, partitionId, 0, hw*2)
       .build()
-    val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes)
-    goodFetch.writeTo(goodFetchBB)
-    goodFetchBB.rewind()
-
-    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
-    EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB)
-    EasyMock.replay(receivedRequest)
+    val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
 
     // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1))
+    apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeNs=1))
 
     // make sure the log only reads bytes between 0->HW (5)
     EasyMock.verify(log)
@@ -170,16 +163,10 @@ class SimpleFetchTest extends JUnit3Suit
       .addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE)
       .build()
 
-    val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes)
-    bigFetch.writeTo(fetchRequest)
-    fetchRequest.rewind()
-
-    val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
-    EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest)
-    EasyMock.replay(receivedRequest)
+    val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch)
 
     // send the request
-    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
+    apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeNs=1))
 
     /**
      * Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after

Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Sep 13 04:27:13 2012
@@ -468,6 +468,14 @@ object TestUtils extends Logging {
         }
     }
   }
+
+  def createRequestByteBuffer(request: RequestOrResponse): ByteBuffer = {
+    val byteBuffer = ByteBuffer.allocate(request.sizeInBytes + 2)
+    byteBuffer.putShort(request.requestId.get)
+    request.writeTo(byteBuffer)
+    byteBuffer.rewind()
+    byteBuffer
+  }
 }
 
 object TestZKUtils {

Modified: incubator/kafka/branches/0.8/system_test/metrics.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/metrics.json?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/metrics.json (original)
+++ incubator/kafka/branches/0.8/system_test/metrics.json Thu Sep 13 04:27:13 2012
@@ -4,136 +4,118 @@
             "role": "broker",
             "graphs": [
                { 
-                  "graph_name": "SocketServerThroughput",
-                  "y_label": "bytes-read-per-second,bytes-written-per-second",
-                  "bean_name": "kafka:type=kafka.SocketServerStats",
-                  "attributes": "BytesReadPerSecond,BytesWrittenPerSecond"
+                  "graph_name": "Produce-Request-Rate",
+                  "y_label": "requests-per-sec",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RequestsPerSec",
+                  "attributes": "OneMinuteRate"
                },
                { 
-                  "graph_name": "FetchRequestPurgatoryNumDelayedRequests",
-                  "y_label": "num-delayed-requests",
-                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
-                  "attributes": "Value"
+                  "graph_name": "Produce-Request-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
                { 
-                  "graph_name": "MeanFetchRequestPurgatorySatisfactionRate",
-                  "y_label": "mean-request-satisfaction-rate",
-                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate",
-                  "attributes": "MeanRate"
+                  "graph_name": "Produce-Request-Remote-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
                { 
-                  "graph_name": "FetchRequestPurgatoryTimeToSatisfy",
-                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
-                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs",
-                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+                  "graph_name": "Fetch-Consumer-Request-Rate",
+                  "y_label": "requests-per-sec",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RequestsPerSec",
+                  "attributes": "OneMinuteRate"
                },
                { 
-                  "graph_name": "FetchRequestPurgatoryExpirationRate",
-                  "y_label": "expiration-rate",
-                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate",
-                  "attributes": "MeanRate"
-               },
-               {
-                  "graph_name": "ProducerRequestPurgatoryNumDelayedRequests",
-                  "y_label": "num-delayed-requests",
-                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
-                  "attributes": "Value"
-               },
-               {
-                  "graph_name": "MeanProducerRequestPurgatorySatisfactionRate",
-                  "y_label": "mean-request-satisfaction-rate",
-                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate",
-                  "attributes": "MeanRate"
-               },
-               {
-                  "graph_name": "ProducerRequestPurgatoryExpirationRate",
-                  "y_label": "expiration-rate",
-                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate",
-                  "attributes": "MeanRate"
+                  "graph_name": "Fetch-Consumer-Request-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond",
-                  "y_label": "mean-caught-up-follower-fetch-requests-per-second",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all",
-                  "attributes": "MeanRate"
+               { 
+                  "graph_name": "Fetch-Consumer-Request-Remote-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-ExpiredRequestRate",
-                  "y_label": "mean-expired-request-rate",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all",
-                  "attributes": "MeanRate"
+               { 
+                  "graph_name": "Fetch-Follower-Request-Rate",
+                  "y_label": "requests-per-sec",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RequestsPerSec",
+                  "attributes": "OneMinuteRate"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency",
-                  "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs",
-                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               { 
+                  "graph_name": "Fetch-Follower-Request-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs",
-                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs",
-                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+               { 
+                  "graph_name": "Fetch-Follower-Request-Remote-Time",
+                  "y_label": "ns,ns",
+                  "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs",
+                  "attributes": "Mean,99thPercentile"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
-                  "y_label": "mean-satisfaction-requests-per-second",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
-                  "attributes": "MeanRate"
+               { 
+                  "graph_name": "ProducePurgatoryExpirationRate",
+                  "y_label": "expirations-per-sec",
+                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=AllExpiresPerSecond",
+                  "attributes": "OneMinuteRate"
                },
-               {
-                  "graph_name": "DelayedProducerRequests-Throughput-all",
-                  "y_label": "mean-purgatory-throughput-all",
-                  "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
-                  "attributes": "MeanRate"
+               { 
+                  "graph_name": "FetchConsumerPurgatoryExpirationRate",
+                  "y_label": "expirations-per-sec",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=ConsumerExpiresPerSecond",
+                  "attributes": "OneMinuteRate"
                },
                {
-                  "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
-                  "y_label": "mean-expired-request-rate",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
-                  "attributes": "MeanRate"
+                  "graph_name": "FetchFollowerPurgatoryExpirationRate",
+                  "y_label": "expirations-per-sec",
+                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=FollowerExpiresPerSecond",
+                  "attributes": "OneMinuteRate"
                },
                {
-                  "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
-                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
-                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+                  "graph_name": "ProducePurgatoryQueueSize",
+                  "y_label": "size",
+                  "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
                },
                {
-                  "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
-                  "y_label": "mean-satisfaction-requests-per-second",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
-                  "attributes": "MeanRate"
+                  "graph_name": "FetchPurgatoryQueueSize",
+                  "y_label": "size",
+                  "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+                  "attributes": "Value"
                },
                {
-                  "graph_name": "DelayedFetchRequests-Follower-Throughput-all",
-                  "y_label": "mean-purgatory-throughput-all",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
-                  "attributes": "MeanRate"
+                  "graph_name": "ControllerLeaderElectionRateAndTime",
+                  "y_label": "elections-per-sec,ms,ms",
+                  "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
+                  "attributes": "OneMinuteRate,Mean,99thPercentile"
                },
                {
-                  "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
-                  "y_label": "mean-expired-request-rate",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
-                  "attributes": "MeanRate"
+                  "graph_name": "LogFlushRateAndTime",
+                  "y_label": "flushes-per-sec,ms,ms",
+                  "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs",
+                  "attributes": "OneMinuteRate,Mean,99thPercentile"
                },
                {
-                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
-                  "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
-                  "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+                  "graph_name": "AllBytesOutRate",
+                  "y_label": "bytes-per-sec",
+                  "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec",
+                  "attributes": "OneMinuteRate"
                },
                {
-                  "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
-                  "y_label": "mean-satisfaction-requests-per-second",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
-                  "attributes": "MeanRate"
+                  "graph_name": "AllBytesInRate",
+                  "y_label": "bytes-per-sec",
+                  "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec",
+                  "attributes": "OneMinuteRate"
                },
                {
-                  "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
-                  "y_label": "mean-purgatory-throughput-all",
-                  "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
-                  "attributes": "MeanRate"
+                  "graph_name": "AllMessagesInRate",
+                  "y_label": "messages-per-sec",
+                  "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec",
+                  "attributes": "OneMinuteRate"
                }
              ]
        },
@@ -141,10 +123,16 @@
             "role": "producer_performance",
             "graphs": [
                {
-                  "graph_name": "ProducerStats",
-                  "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput",
-                  "bean_name": "kafka:type=kafka.KafkaProducerStats",
-                  "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond"
+                  "graph_name": "ProduceRequestRateAndTime",
+                  "y_label": "requests-per-sec,ms,ms",
+                  "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs",
+                  "attributes": "OneMinuteRate,Mean,99thPercentile"
+               },
+               {
+                  "graph_name": "ProduceRequestSize",
+                  "y_label": "bytes,bytes",
+                  "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProducerRequestSize",
+                  "attributes": "Mean,99thPercentile"
                }
              ]
        },
@@ -152,10 +140,22 @@
             "role": "console_consumer",
             "graphs": [
                {
-                  "graph_name": "SimpleConsumerRequestStats",
-                  "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms",
-                  "bean_name": "kafka:type=kafka.SimpleConsumerStats",
-                  "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs"
+                  "graph_name": "FetchRequestRateAndTime",
+                  "y_label": "requests-per-sec,ms,ms",
+                  "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs",
+                  "attributes": "OneMinuteRate,Mean,99thPercentile"
+               },
+               {
+                  "graph_name": "FetchResponseSize",
+                  "y_label": "bytes,bytes",
+                  "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchResponseSize",
+                  "attributes": "Mean,99thPercentile"
+               },
+               {
+                  "graph_name": "ConsumedMessageRate",
+                  "y_label": "messages-per-sec",
+                  "bean_name": "kafka.consumer:type=ConsumerTopicStat,name=AllTopicsMessagesPerSec",
+                  "attributes": "OneMinuteRate"
                }
              ]
        },