You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/13 06:27:16 UTC
svn commit: r1384202 [2/2] - in /incubator/kafka/branches/0.8:
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/
core/src/main/scala/kafka/log/ core/src/main/scala/k...
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala Thu Sep 13 04:27:13 2012
@@ -28,8 +28,10 @@ import org.apache.zookeeper.Watcher.Even
import collection.JavaConversions._
import kafka.utils.{ShutdownableThread, ZkUtils, Logging}
import java.lang.Object
-import java.util.concurrent.{LinkedBlockingQueue, BlockingQueue}
-import kafka.common.{KafkaException, PartitionOfflineException}
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.{TimeUnit, LinkedBlockingQueue, BlockingQueue}
+import kafka.common.{PartitionOfflineException, KafkaException}
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
class RequestSendThread(val controllerId: Int,
@@ -52,9 +54,9 @@ class RequestSendThread(val controllerId
receive = channel.receive()
var response: RequestOrResponse = null
request.requestId.get match {
- case RequestKeys.LeaderAndISRRequest =>
+ case RequestKeys.LeaderAndIsrKey =>
response = LeaderAndISRResponse.readFrom(receive.buffer)
- case RequestKeys.StopReplicaRequest =>
+ case RequestKeys.StopReplicaKey =>
response = StopReplicaResponse.readFrom(receive.buffer)
}
trace("got a response %s".format(controllerId, response, toBrokerId))
@@ -144,7 +146,7 @@ case class ControllerBrokerStateInfo(cha
messageQueue: BlockingQueue[(RequestOrResponse, (RequestOrResponse) => Unit)],
requestSendThread: RequestSendThread)
-class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging {
+class KafkaController(config : KafkaConfig, zkClient: ZkClient) extends Logging with KafkaMetricsGroup{
this.logIdent = "[Controller " + config.brokerId + "], "
private var isRunning = true
private val controllerLock = new Object
@@ -155,6 +157,13 @@ class KafkaController(config : KafkaConf
private var allPartitionReplicaAssignment: mutable.Map[(String, Int), Seq[Int]] = null
private var allLeaders: mutable.Map[(String, Int), Int] = null
+ newGauge(
+ "ActiveControllerCount",
+ new Gauge[Int] {
+ def value() = if (isActive) 1 else 0
+ }
+ )
+
// Return true if this controller succeeds in the controller leader election
private def tryToBecomeController(): Boolean = {
val controllerStatus =
@@ -369,6 +378,7 @@ class KafkaController(config : KafkaConf
}
else{
warn("during initializing leader of parition (%s, %d), assigned replicas are [%s], live brokers are [%s], no assigned replica is alive".format(topicPartition._1, topicPartition._2, replicaAssignment.mkString(","), liveBrokerIds))
+ ControllerStat.offlinePartitionRate.mark()
}
}
@@ -479,10 +489,13 @@ class KafkaController(config : KafkaConf
debug("No broker is ISR is alive, picking the leader from the alive assigned replicas: %s"
.format(liveAssignedReplicasToThisPartition.mkString(",")))
liveAssignedReplicasToThisPartition.isEmpty match {
- case true => throw new PartitionOfflineException(("No replica for partition " +
- "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
- " Assigned replicas are: [%s]".format(assignedReplicas))
+ case true =>
+ ControllerStat.offlinePartitionRate.mark()
+ throw new PartitionOfflineException(("No replica for partition " +
+ "([%s, %d]) is alive. Live brokers are: [%s],".format(topic, partition, liveBrokerIds)) +
+ " Assigned replicas are: [%s]".format(assignedReplicas))
case false =>
+ ControllerStat.uncleanLeaderElectionRate.mark()
val newLeader = liveAssignedReplicasToThisPartition.head
warn("No broker in ISR is alive, elected leader from the alive replicas is [%s], ".format(newLeader) +
"There's potential data loss")
@@ -509,18 +522,20 @@ class KafkaController(config : KafkaConf
class BrokerChangeListener() extends IZkChildListener with Logging {
this.logIdent = "[Controller " + config.brokerId + "], "
def handleChildChange(parentPath : String, currentBrokerList : java.util.List[String]) {
- controllerLock synchronized {
- val curBrokerIds = currentBrokerList.map(_.toInt).toSet
- val newBrokerIds = curBrokerIds -- liveBrokerIds
- val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
- val deletedBrokerIds = liveBrokerIds -- curBrokerIds
- liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
- liveBrokerIds = liveBrokers.map(_.id)
- info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
- .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
- newBrokers.foreach(controllerChannelManager.addBroker(_))
- deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
- onBrokerChange(newBrokerIds)
+ ControllerStat.leaderElectionTimer.time {
+ controllerLock synchronized {
+ val curBrokerIds = currentBrokerList.map(_.toInt).toSet
+ val newBrokerIds = curBrokerIds -- liveBrokerIds
+ val newBrokers = newBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+ val deletedBrokerIds = liveBrokerIds -- curBrokerIds
+ liveBrokers = curBrokerIds.map(ZkUtils.getBrokerInfo(zkClient, _)).filter(_.isDefined).map(_.get)
+ liveBrokerIds = liveBrokers.map(_.id)
+ info("Newly added brokers: %s, deleted brokers: %s, all brokers: %s"
+ .format(newBrokerIds.mkString(","), deletedBrokerIds.mkString(","), liveBrokerIds.mkString(",")))
+ newBrokers.foreach(controllerChannelManager.addBroker(_))
+ deletedBrokerIds.foreach(controllerChannelManager.removeBroker(_))
+ onBrokerChange(newBrokerIds)
+ }
}
}
}
@@ -591,4 +606,10 @@ class KafkaController(config : KafkaConf
}
}
}
-}
\ No newline at end of file
+}
+
+object ControllerStat extends KafkaMetricsGroup {
+ val offlinePartitionRate = newMeter("OfflinePartitionsPerSec", "partitions", TimeUnit.SECONDS)
+ val uncleanLeaderElectionRate = newMeter("UncleanLeaderElectionsPerSec", "elections", TimeUnit.SECONDS)
+ val leaderElectionTimer = new KafkaTimer(newTimer("LeaderElectionRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala Thu Sep 13 04:27:13 2012
@@ -19,7 +19,8 @@ package kafka.server
import kafka.network._
import kafka.utils._
-import java.util.concurrent.atomic.AtomicLong
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
/**
* A thread that answers kafka requests.
@@ -30,10 +31,11 @@ class KafkaRequestHandler(id: Int, broke
def run() {
while(true) {
val req = requestChannel.receiveRequest()
- if(req == RequestChannel.AllDone){
+ if(req eq RequestChannel.AllDone){
trace("receives shut down command, shut down".format(brokerId, id))
return
}
+ req.dequeueTimeNs = SystemTime.nanoseconds
debug("handles request " + req)
apis.handle(req)
}
@@ -63,62 +65,24 @@ class KafkaRequestHandlerPool(val broker
thread.join
info("shutted down completely")
}
-
}
-trait BrokerTopicStatMBean {
- def getMessagesIn: Long
- def getBytesIn: Long
- def getBytesOut: Long
- def getFailedProduceRequest: Long
- def getFailedFetchRequest: Long
+class BrokerTopicMetrics(name: String) extends KafkaMetricsGroup {
+ val messagesInRate = newMeter(name + "MessagesInPerSec", "messages", TimeUnit.SECONDS)
+ val bytesInRate = newMeter(name + "BytesInPerSec", "bytes", TimeUnit.SECONDS)
+ val bytesOutRate = newMeter(name + "BytesOutPerSec", "bytes", TimeUnit.SECONDS)
+ val failedProduceRequestRate = newMeter(name + "FailedProduceRequestsPerSec", "requests", TimeUnit.SECONDS)
+ val failedFetchRequestRate = newMeter(name + "FailedFetchRequestsPerSec", "requests", TimeUnit.SECONDS)
}
-@threadsafe
-class BrokerTopicStat extends BrokerTopicStatMBean {
- private val numCumulatedMessagesIn = new AtomicLong(0)
- private val numCumulatedBytesIn = new AtomicLong(0)
- private val numCumulatedBytesOut = new AtomicLong(0)
- private val numCumulatedFailedProduceRequests = new AtomicLong(0)
- private val numCumulatedFailedFetchRequests = new AtomicLong(0)
-
- def getMessagesIn: Long = numCumulatedMessagesIn.get
-
- def recordMessagesIn(nMessages: Int) = numCumulatedMessagesIn.getAndAdd(nMessages)
-
- def getBytesIn: Long = numCumulatedBytesIn.get
-
- def recordBytesIn(nBytes: Long) = numCumulatedBytesIn.getAndAdd(nBytes)
-
- def getBytesOut: Long = numCumulatedBytesOut.get
-
- def recordBytesOut(nBytes: Long) = numCumulatedBytesOut.getAndAdd(nBytes)
-
- def recordFailedProduceRequest = numCumulatedFailedProduceRequests.getAndIncrement
-
- def getFailedProduceRequest = numCumulatedFailedProduceRequests.get()
+object BrokerTopicStat extends Logging {
+ private val valueFactory = (k: String) => new BrokerTopicMetrics(k)
+ private val stats = new Pool[String, BrokerTopicMetrics](Some(valueFactory))
+ private val allTopicStat = new BrokerTopicMetrics("AllTopics")
- def recordFailedFetchRequest = numCumulatedFailedFetchRequests.getAndIncrement
+ def getBrokerAllTopicStat(): BrokerTopicMetrics = allTopicStat
- def getFailedFetchRequest = numCumulatedFailedFetchRequests.get()
-}
-
-object BrokerTopicStat extends Logging {
- private val stats = new Pool[String, BrokerTopicStat]
- private val allTopicStat = new BrokerTopicStat
- Utils.registerMBean(allTopicStat, "kafka:type=kafka.BrokerAllTopicStat")
-
- def getBrokerAllTopicStat(): BrokerTopicStat = allTopicStat
-
- def getBrokerTopicStat(topic: String): BrokerTopicStat = {
- var stat = stats.get(topic)
- if (stat == null) {
- stat = new BrokerTopicStat
- if (stats.putIfNotExists(topic, stat) == null)
- Utils.registerMBean(stat, "kafka:type=kafka.BrokerTopicStat." + topic)
- else
- stat = stats.get(topic)
- }
- return stat
+ def getBrokerTopicStat(topic: String): BrokerTopicMetrics = {
+ stats.getAndMaybePut(topic + "-")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala Thu Sep 13 04:27:13 2012
@@ -18,7 +18,7 @@
package kafka.server
import java.io.File
-import kafka.network.{SocketServerStats, SocketServer}
+import kafka.network.SocketServer
import kafka.log.LogManager
import kafka.utils._
import java.util.concurrent._
@@ -34,7 +34,6 @@ class KafkaServer(val config: KafkaConfi
val CleanShutdownFile = ".kafka_cleanshutdown"
private var isShuttingDown = new AtomicBoolean(false)
private var shutdownLatch = new CountDownLatch(1)
- private val statsMBeanName = "kafka:type=kafka.SocketServerStats"
var socketServer: SocketServer = null
var requestHandlerPool: KafkaRequestHandlerPool = null
var logManager: LogManager = null
@@ -82,8 +81,6 @@ class KafkaServer(val config: KafkaConfi
socketServer.startup
- Utils.registerMBean(socketServer.stats, statsMBeanName)
-
/* start client */
kafkaZookeeper = new KafkaZooKeeper(config)
// starting relevant replicas and leader election for partitions assigned to this broker
@@ -123,7 +120,6 @@ class KafkaServer(val config: KafkaConfi
replicaManager.shutdown()
if (socketServer != null)
socketServer.shutdown()
- Utils.unregisterMBean(statsMBeanName)
if(logManager != null)
logManager.shutdown()
@@ -144,8 +140,6 @@ class KafkaServer(val config: KafkaConfi
def awaitShutdown(): Unit = shutdownLatch.await()
def getLogManager(): LogManager = logManager
-
- def getStats(): SocketServerStats = socketServer.stats
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala Thu Sep 13 04:27:13 2012
@@ -24,13 +24,16 @@ import kafka.utils._
import kafka.log.LogManager
import kafka.api.{LeaderAndIsrRequest, LeaderAndIsr}
import kafka.common.{UnknownTopicOrPartitionException, LeaderNotAvailableException, ErrorMapping}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.TimeUnit
object ReplicaManager {
val UnknownLogEndOffset = -1L
}
class ReplicaManager(val config: KafkaConfig, time: Time, val zkClient: ZkClient, kafkaScheduler: KafkaScheduler,
- val logManager: LogManager) extends Logging {
+ val logManager: LogManager) extends Logging with KafkaMetricsGroup {
private val allPartitions = new Pool[(String, Int), Partition]
private var leaderPartitions = new mutable.HashSet[Partition]()
private val leaderPartitionsLock = new Object
@@ -41,6 +44,26 @@ class ReplicaManager(val config: KafkaCo
val highWatermarkCheckpoint = new HighwaterMarkCheckpoint(config.logDir)
info("Created highwatermark file %s".format(highWatermarkCheckpoint.name))
+ newGauge(
+ "LeaderCount",
+ new Gauge[Int] {
+ def value() = leaderPartitions.size
+ }
+ )
+ newGauge(
+ "UnderReplicatedPartitions",
+ new Gauge[Int] {
+ def value() = {
+ leaderPartitionsLock synchronized {
+ leaderPartitions.count(_.isUnderReplicated)
+ }
+ }
+ }
+ )
+ val isrExpandRate = newMeter("IsrExpandsPerSec", "expands", TimeUnit.SECONDS)
+ val isrShrinkRate = newMeter("ISRShrinksPerSec", "shrinks", TimeUnit.SECONDS)
+
+
def startHighWaterMarksCheckPointThread() = {
if(highWatermarkCheckPointThreadStarted.compareAndSet(false, true))
kafkaScheduler.scheduleWithRate(checkpointHighWatermarks, "highwatermark-checkpoint-thread", 0, config.defaultFlushIntervalMs)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala Thu Sep 13 04:27:13 2012
@@ -33,7 +33,6 @@ import kafka.metrics.KafkaMetricsGroup
* for example a key could be a (topic, partition) pair.
*/
class DelayedRequest(val keys: Seq[Any], val request: RequestChannel.Request, delayMs: Long) extends DelayedItem[RequestChannel.Request](request, delayMs) {
- val creationTimeNs = SystemTime.nanoseconds
val satisfied = new AtomicBoolean(false)
}
@@ -67,32 +66,13 @@ abstract class RequestPurgatory[T <: Del
/* a list of requests watching each key */
private val watchersForKey = new Pool[Any, Watchers](Some((key: Any) => new Watchers))
- private val numDelayedRequestsBeanName = "NumDelayedRequests"
- private val timeToSatisfyHistogramBeanName = "TimeToSatisfyInNs"
- private val satisfactionRateBeanName = "SatisfactionRate"
- private val expirationRateBeanName = "ExpirationRate"
-
- val satisfactionRateMeter = newMeter(
- satisfactionRateBeanName,
- "requests",
- TimeUnit.SECONDS
- )
-
- val timeToSatisfyHistogram = newHistogram(timeToSatisfyHistogramBeanName, biased = true)
-
newGauge(
- numDelayedRequestsBeanName,
+ "NumDelayedRequests",
new Gauge[Int] {
def value() = expiredRequestReaper.unsatisfied.get()
}
)
- val expirationRateMeter = newMeter(
- expirationRateBeanName,
- "requests",
- TimeUnit.SECONDS
- )
-
/* background thread expiring requests that have been waiting too long */
private val expiredRequestReaper = new ExpiredRequestReaper
private val expirationThread = Utils.daemonThread("request-expiration-task", expiredRequestReaper)
@@ -196,10 +176,6 @@ abstract class RequestPurgatory[T <: Del
iter.remove()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated == true) {
- val requestNs = SystemTime.nanoseconds - curr.creationTimeNs
- satisfactionRateMeter.mark()
- timeToSatisfyHistogram.update(requestNs)
-
response += curr
liveCount -= 1
expiredRequestReaper.satisfyRequest()
@@ -282,7 +258,6 @@ abstract class RequestPurgatory[T <: Del
val curr = delayed.take()
val updated = curr.satisfied.compareAndSet(false, true)
if(updated) {
- expirationRateMeter.mark()
unsatisfied.getAndDecrement()
for(key <- curr.keys)
watchersFor(key).decLiveCount()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala Thu Sep 13 04:27:13 2012
@@ -21,12 +21,14 @@ import java.util.ArrayList
import java.util.concurrent._
import collection.JavaConversions
import kafka.common.KafkaException
+import java.lang.Object
class Pool[K,V](valueFactory: Option[(K) => V] = None) extends Iterable[(K, V)] {
private val pool = new ConcurrentHashMap[K, V]
-
+ private val createLock = new Object
+
def this(m: collection.Map[K, V]) {
this()
m.foreach(kv => pool.put(kv._1, kv._2))
@@ -52,8 +54,12 @@ class Pool[K,V](valueFactory: Option[(K)
throw new KafkaException("Empty value factory in pool.")
val curr = pool.get(key)
if (curr == null) {
- pool.putIfAbsent(key, valueFactory.get(key))
- pool.get(key)
+ createLock synchronized {
+ val curr = pool.get(key)
+ if (curr == null)
+ pool.put(key, valueFactory.get(key))
+ pool.get(key)
+ }
}
else
curr
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala Thu Sep 13 04:27:13 2012
@@ -20,7 +20,6 @@ package kafka.utils
import java.io._
import java.nio._
import java.nio.channels._
-import java.util.concurrent.atomic._
import java.lang.management._
import java.util.zip.CRC32
import javax.management._
@@ -685,100 +684,6 @@ object Utils extends Logging {
for (forever <- Stream.continually(1); t <- coll) yield t
stream.iterator
}
-
-}
-
-class SnapshotStats(private val monitorDurationNs: Long = 600L * 1000L * 1000L * 1000L) {
- private val time: Time = SystemTime
-
- private val complete = new AtomicReference(new Stats())
- private val current = new AtomicReference(new Stats())
- private val total = new AtomicLong(0)
- private val numCumulatedRequests = new AtomicLong(0)
-
- def recordRequestMetric(requestNs: Long) {
- val stats = current.get
- stats.add(requestNs)
- total.getAndAdd(requestNs)
- numCumulatedRequests.getAndAdd(1)
- val ageNs = time.nanoseconds - stats.start
- // if the current stats are too old it is time to swap
- if(ageNs >= monitorDurationNs) {
- val swapped = current.compareAndSet(stats, new Stats())
- if(swapped) {
- complete.set(stats)
- stats.end.set(time.nanoseconds)
- }
- }
- }
-
- def recordThroughputMetric(data: Long) {
- val stats = current.get
- stats.addData(data)
- val ageNs = time.nanoseconds - stats.start
- // if the current stats are too old it is time to swap
- if(ageNs >= monitorDurationNs) {
- val swapped = current.compareAndSet(stats, new Stats())
- if(swapped) {
- complete.set(stats)
- stats.end.set(time.nanoseconds)
- }
- }
- }
-
- def getNumRequests(): Long = numCumulatedRequests.get
-
- def getRequestsPerSecond: Double = {
- val stats = complete.get
- stats.numRequests / stats.durationSeconds
- }
-
- def getThroughput: Double = {
- val stats = complete.get
- stats.totalData / stats.durationSeconds
- }
-
- def getAvgMetric: Double = {
- val stats = complete.get
- if (stats.numRequests == 0) {
- 0
- }
- else {
- stats.totalRequestMetric / stats.numRequests
- }
- }
-
- def getTotalMetric: Long = total.get
-
- def getMaxMetric: Double = complete.get.maxRequestMetric
-
- class Stats {
- val start = time.nanoseconds
- var end = new AtomicLong(-1)
- var numRequests = 0
- var totalRequestMetric: Long = 0L
- var maxRequestMetric: Long = 0L
- var totalData: Long = 0L
- private val lock = new Object()
-
- def addData(data: Long) {
- lock synchronized {
- totalData += data
- }
- }
-
- def add(requestNs: Long) {
- lock synchronized {
- numRequests +=1
- totalRequestMetric += requestNs
- maxRequestMetric = scala.math.max(maxRequestMetric, requestNs)
- }
- }
-
- def durationSeconds: Double = (end.get - start) / (1000.0 * 1000.0 * 1000.0)
-
- def durationMs: Double = (end.get - start) / (1000.0 * 1000.0)
- }
}
/**
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala Thu Sep 13 04:27:13 2012
@@ -29,8 +29,7 @@ import kafka.utils.TestUtils
import kafka.utils.TestUtils._
import kafka.server.{ReplicaManager, KafkaApis, KafkaConfig}
import kafka.common.ErrorMapping
-import kafka.api.{TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
-
+import kafka.api.{RequestKeys, TopicMetadata, TopicMetaDataResponse, TopicMetadataRequest}
class TopicMetadataTest extends JUnit3Suite with ZooKeeperTestHarness {
val props = createBrokerConfigs(1)
@@ -106,29 +105,20 @@ class TopicMetadataTest extends JUnit3Su
// create a topic metadata request
val topicMetadataRequest = new TopicMetadataRequest(List(topic))
- val serializedMetadataRequest = ByteBuffer.allocate(topicMetadataRequest.sizeInBytes + 2)
- topicMetadataRequest.writeTo(serializedMetadataRequest)
- serializedMetadataRequest.rewind()
+ val serializedMetadataRequest = TestUtils.createRequestByteBuffer(topicMetadataRequest)
// create the kafka request handler
val requestChannel = new RequestChannel(2, 5)
val apis = new KafkaApis(requestChannel, replicaManager, zkClient, 1)
- // mock the receive API to return the request buffer as created above
- val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
- EasyMock.expect(receivedRequest.buffer).andReturn(serializedMetadataRequest)
- EasyMock.replay(receivedRequest)
-
// call the API (to be tested) to get metadata
- apis.handleTopicMetadataRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
- val metadataResponse = requestChannel.receiveResponse(0).response.asInstanceOf[BoundedByteBufferSend].buffer
+ apis.handleTopicMetadataRequest(new RequestChannel.Request
+ (processor=0, requestKey=RequestKeys.MetadataKey, buffer=serializedMetadataRequest, startTimeNs=1))
+ val metadataResponse = requestChannel.receiveResponse(0).responseSend.asInstanceOf[BoundedByteBufferSend].buffer
// check assertions
val topicMetadata = TopicMetaDataResponse.readFrom(metadataResponse).topicsMetadata
- // verify the expected calls to log manager occurred in the right order
- EasyMock.verify(receivedRequest)
-
topicMetadata
}
}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala Thu Sep 13 04:27:13 2012
@@ -24,6 +24,9 @@ import org.scalatest.junit.JUnitSuite
import kafka.utils.TestUtils
import java.util.Random
import junit.framework.Assert._
+import kafka.producer.SyncProducerConfig
+import kafka.api.{TopicData, ProducerRequest}
+import java.nio.ByteBuffer
class SocketServerTest extends JUnitSuite {
@@ -54,9 +57,9 @@ class SocketServerTest extends JUnitSuit
/* A simple request handler that just echos back the response */
def processRequest(channel: RequestChannel) {
val request = channel.receiveRequest
- val id = request.request.buffer.getShort
- val send = new BoundedByteBufferSend(request.request.buffer.slice)
- channel.sendResponse(new RequestChannel.Response(request.processor, request.requestKey, send, request.start, 15))
+ val id = request.buffer.getShort
+ val send = new BoundedByteBufferSend(request.buffer.slice)
+ channel.sendResponse(new RequestChannel.Response(request.processor, request, send))
}
def connect() = new Socket("localhost", server.port)
@@ -69,10 +72,21 @@ class SocketServerTest extends JUnitSuit
@Test
def simpleRequest() {
val socket = connect()
- sendRequest(socket, 0, "hello".getBytes)
+ val correlationId = SyncProducerConfig.DefaultCorrelationId
+ val clientId = SyncProducerConfig.DefaultClientId
+ val ackTimeoutMs = SyncProducerConfig.DefaultAckTimeoutMs
+ val ack = SyncProducerConfig.DefaultRequiredAcks
+ val emptyRequest = new ProducerRequest(correlationId, clientId, ack, ackTimeoutMs, Array[TopicData]())
+
+ val byteBuffer = ByteBuffer.allocate(emptyRequest.sizeInBytes())
+ emptyRequest.writeTo(byteBuffer)
+ byteBuffer.rewind()
+ val serializedBytes = new Array[Byte](byteBuffer.remaining)
+ byteBuffer.get(serializedBytes)
+
+ sendRequest(socket, 0, serializedBytes)
processRequest(server.requestChannel)
- val response = new String(receiveResponse(socket))
- assertEquals("hello", response)
+ assertEquals(serializedBytes.toSeq, receiveResponse(socket).toSeq)
}
@Test(expected=classOf[IOException])
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala Thu Sep 13 04:27:13 2012
@@ -16,16 +16,15 @@
*/
package kafka.server
-import java.nio.ByteBuffer
-import kafka.api.{FetchRequest, FetchRequestBuilder}
import kafka.cluster.{Partition, Replica}
import kafka.log.Log
import kafka.message.{ByteBufferMessageSet, Message}
-import kafka.network.{BoundedByteBufferReceive, RequestChannel}
+import kafka.network.RequestChannel
import kafka.utils.{Time, TestUtils, MockTime}
import org.easymock.EasyMock
import org.I0Itec.zkclient.ZkClient
import org.scalatest.junit.JUnit3Suite
+import kafka.api.{FetchRequest, FetchRequestBuilder}
class SimpleFetchTest extends JUnit3Suite {
@@ -92,16 +91,10 @@ class SimpleFetchTest extends JUnit3Suit
.replicaId(FetchRequest.NonFollowerId)
.addFetch(topic, partitionId, 0, hw*2)
.build()
- val goodFetchBB = ByteBuffer.allocate(goodFetch.sizeInBytes)
- goodFetch.writeTo(goodFetchBB)
- goodFetchBB.rewind()
-
- val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
- EasyMock.expect(receivedRequest.buffer).andReturn(goodFetchBB)
- EasyMock.replay(receivedRequest)
+ val goodFetchBB = TestUtils.createRequestByteBuffer(goodFetch)
// send the request
- apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, request=receivedRequest, start=1))
+ apis.handleFetchRequest(new RequestChannel.Request(processor=1, requestKey=5, buffer=goodFetchBB, startTimeNs=1))
// make sure the log only reads bytes between 0->HW (5)
EasyMock.verify(log)
@@ -170,16 +163,10 @@ class SimpleFetchTest extends JUnit3Suit
.addFetch(topic, partitionId, followerLEO, Integer.MAX_VALUE)
.build()
- val fetchRequest = ByteBuffer.allocate(bigFetch.sizeInBytes)
- bigFetch.writeTo(fetchRequest)
- fetchRequest.rewind()
-
- val receivedRequest = EasyMock.createMock(classOf[BoundedByteBufferReceive])
- EasyMock.expect(receivedRequest.buffer).andReturn(fetchRequest)
- EasyMock.replay(receivedRequest)
+ val fetchRequestBB = TestUtils.createRequestByteBuffer(bigFetch)
// send the request
- apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, request=receivedRequest, start=1))
+ apis.handleFetchRequest(new RequestChannel.Request(processor=0, requestKey=5, buffer=fetchRequestBB, startTimeNs=1))
/**
* Make sure the log satisfies the fetch from a follower by reading data beyond the HW, mainly all bytes after
Modified: incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala (original)
+++ incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala Thu Sep 13 04:27:13 2012
@@ -468,6 +468,14 @@ object TestUtils extends Logging {
}
}
}
+
+ def createRequestByteBuffer(request: RequestOrResponse): ByteBuffer = {
+ val byteBuffer = ByteBuffer.allocate(request.sizeInBytes + 2)
+ byteBuffer.putShort(request.requestId.get)
+ request.writeTo(byteBuffer)
+ byteBuffer.rewind()
+ byteBuffer
+ }
}
object TestZKUtils {
Modified: incubator/kafka/branches/0.8/system_test/metrics.json
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/system_test/metrics.json?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/system_test/metrics.json (original)
+++ incubator/kafka/branches/0.8/system_test/metrics.json Thu Sep 13 04:27:13 2012
@@ -4,136 +4,118 @@
"role": "broker",
"graphs": [
{
- "graph_name": "SocketServerThroughput",
- "y_label": "bytes-read-per-second,bytes-written-per-second",
- "bean_name": "kafka:type=kafka.SocketServerStats",
- "attributes": "BytesReadPerSecond,BytesWrittenPerSecond"
+ "graph_name": "Produce-Request-Rate",
+ "y_label": "requests-per-sec",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RequestsPerSec",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "FetchRequestPurgatoryNumDelayedRequests",
- "y_label": "num-delayed-requests",
- "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
- "attributes": "Value"
+ "graph_name": "Produce-Request-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Produce-TotalTimeNs",
+ "attributes": "Mean,99thPercentile"
},
{
- "graph_name": "MeanFetchRequestPurgatorySatisfactionRate",
- "y_label": "mean-request-satisfaction-rate",
- "bean_name": "kafka.server:type=FetchRequestPurgatory,name=SatisfactionRate",
- "attributes": "MeanRate"
+ "graph_name": "Produce-Request-Remote-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Produce-RemoteTimeNs",
+ "attributes": "Mean,99thPercentile"
},
{
- "graph_name": "FetchRequestPurgatoryTimeToSatisfy",
- "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
- "bean_name": "kafka.server:type=FetchRequestPurgatory,name=TimeToSatisfyInNs",
- "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ "graph_name": "Fetch-Consumer-Request-Rate",
+ "y_label": "requests-per-sec",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RequestsPerSec",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "FetchRequestPurgatoryExpirationRate",
- "y_label": "expiration-rate",
- "bean_name": "kafka.server:type=FetchRequestPurgatory,name=ExpirationRate",
- "attributes": "MeanRate"
- },
- {
- "graph_name": "ProducerRequestPurgatoryNumDelayedRequests",
- "y_label": "num-delayed-requests",
- "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
- "attributes": "Value"
- },
- {
- "graph_name": "MeanProducerRequestPurgatorySatisfactionRate",
- "y_label": "mean-request-satisfaction-rate",
- "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=SatisfactionRate",
- "attributes": "MeanRate"
- },
- {
- "graph_name": "ProducerRequestPurgatoryExpirationRate",
- "y_label": "expiration-rate",
- "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=ExpirationRate",
- "attributes": "MeanRate"
+ "graph_name": "Fetch-Consumer-Request-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-TotalTimeNs",
+ "attributes": "Mean,99thPercentile"
},
- {
- "graph_name": "DelayedProducerRequests-CaughtUpFollowerFetchRequestsPerSecond",
- "y_label": "mean-caught-up-follower-fetch-requests-per-second",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=CaughtUpFollowerRequestsPerSecond-all",
- "attributes": "MeanRate"
+ {
+ "graph_name": "Fetch-Consumer-Request-Remote-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Consumer-RemoteTimeNs",
+ "attributes": "Mean,99thPercentile"
},
- {
- "graph_name": "DelayedProducerRequests-ExpiredRequestRate",
- "y_label": "mean-expired-request-rate",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=ExpiredRequestsPerSecond-all",
- "attributes": "MeanRate"
+ {
+ "graph_name": "Fetch-Follower-Request-Rate",
+ "y_label": "requests-per-sec",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RequestsPerSec",
+ "attributes": "OneMinuteRate"
},
- {
- "graph_name": "DelayedProducerRequests-FollowerCatchUpLatency",
- "y_label": "mean-follower-catchup-time-ns,95th-percentile-follower-catchup-time-ns,99th-percentile-follower-catchup-time-ns,999th-percentile-follower-catchup-time-ns",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=FollowerCatchUpTimeInNs",
- "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ {
+ "graph_name": "Fetch-Follower-Request-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-TotalTimeNs",
+ "attributes": "Mean,99thPercentile"
},
- {
- "graph_name": "DelayedProducerRequests-SatisfactionTimeInNs",
- "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfactionTimeInNs",
- "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ {
+ "graph_name": "Fetch-Follower-Request-Remote-Time",
+ "y_label": "ns,ns",
+ "bean_name": "kafka.network:type=RequestMetrics,name=Fetch-Follower-RemoteTimeNs",
+ "attributes": "Mean,99thPercentile"
},
- {
- "graph_name": "DelayedProducerRequests-SatisfiedRequestsPerSecond",
- "y_label": "mean-satisfaction-requests-per-second",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=SatisfiedRequestsPerSecond",
- "attributes": "MeanRate"
+ {
+ "graph_name": "ProducePurgatoryExpirationRate",
+ "y_label": "expirations-per-sec",
+ "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=AllExpiresPerSecond",
+ "attributes": "OneMinuteRate"
},
- {
- "graph_name": "DelayedProducerRequests-Throughput-all",
- "y_label": "mean-purgatory-throughput-all",
- "bean_name": "kafka.server:type=DelayedProducerRequestMetrics,name=Throughput-all",
- "attributes": "MeanRate"
+ {
+ "graph_name": "FetchConsumerPurgatoryExpirationRate",
+ "y_label": "expirations-per-sec",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=ConsumerExpiresPerSecond",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "DelayedFetchRequests-Follower-ExpiredRequestRate",
- "y_label": "mean-expired-request-rate",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-ExpiredRequestsPerSecond",
- "attributes": "MeanRate"
+ "graph_name": "FetchFollowerPurgatoryExpirationRate",
+ "y_label": "expirations-per-sec",
+ "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=FollowerExpiresPerSecond",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "DelayedFetchRequests-Follower-SatisfactionTimeInNs",
- "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfactionTimeInNs",
- "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ "graph_name": "ProducePurgatoryQueueSize",
+ "y_label": "size",
+ "bean_name": "kafka.server:type=ProducerRequestPurgatory,name=NumDelayedRequests",
+ "attributes": "Value"
},
{
- "graph_name": "DelayedProducerRequests-Follower-SatisfiedRequestsPerSecond",
- "y_label": "mean-satisfaction-requests-per-second",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-SatisfiedRequestsPerSecond",
- "attributes": "MeanRate"
+ "graph_name": "FetchPurgatoryQueueSize",
+ "y_label": "size",
+ "bean_name": "kafka.server:type=FetchRequestPurgatory,name=NumDelayedRequests",
+ "attributes": "Value"
},
{
- "graph_name": "DelayedFetchRequests-Follower-Throughput-all",
- "y_label": "mean-purgatory-throughput-all",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=Follower-Throughput-all",
- "attributes": "MeanRate"
+ "graph_name": "ControllerLeaderElectionRateAndTime",
+ "y_label": "elections-per-sec,ms,ms",
+ "bean_name": "kafka.server:type=ControllerStat,name=LeaderElectionRateAndTimeMs",
+ "attributes": "OneMinuteRate,Mean,99thPercentile"
},
{
- "graph_name": "DelayedFetchRequests-NonFollower-ExpiredRequestRate",
- "y_label": "mean-expired-request-rate",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-ExpiredRequestsPerSecond",
- "attributes": "MeanRate"
+ "graph_name": "LogFlushRateAndTime",
+ "y_label": "flushes-per-sec,ms,ms",
+ "bean_name": "kafka.message:type=LogFlushStats,name=LogFlushRateAndTimeMs",
+ "attributes": "OneMinuteRate,Mean,99thPercentile"
},
{
- "graph_name": "DelayedFetchRequests-NonFollower-SatisfactionTimeInNs",
- "y_label": "mean-time-to-satisfy-ns,95th-percentile-time-to-satisfy-ns,99th-percentile-time-to-satisfy-ns,999th-percentile-time-to-satisfy-ns",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfactionTimeInNs",
- "attributes": "Mean,95thPercentile,99thPercentile,999thPercentile"
+ "graph_name": "AllBytesOutRate",
+ "y_label": "bytes-per-sec",
+ "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesOutPerSec",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "DelayedFetchRequests-NonFollower-SatisfiedRequestsPerSecond",
- "y_label": "mean-satisfaction-requests-per-second",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-SatisfiedRequestsPerSecond",
- "attributes": "MeanRate"
+ "graph_name": "AllBytesInRate",
+ "y_label": "bytes-per-sec",
+ "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsBytesInPerSec",
+ "attributes": "OneMinuteRate"
},
{
- "graph_name": "DelayedFetchRequests-NonFollower-Throughput-all",
- "y_label": "mean-purgatory-throughput-all",
- "bean_name": "kafka.server:type=DelayedFetchRequestMetrics,name=NonFollower-Throughput-all",
- "attributes": "MeanRate"
+ "graph_name": "AllMessagesInRate",
+ "y_label": "messages-per-sec",
+ "bean_name": "kafka.server:type=BrokerTopicMetrics,name=AllTopicsMessagesInPerSec",
+ "attributes": "OneMinuteRate"
}
]
},
@@ -141,10 +123,16 @@
"role": "producer_performance",
"graphs": [
{
- "graph_name": "ProducerStats",
- "y_label": "avg-producer-latency-ms,max-producer-latency-ms,produce-request-throughput",
- "bean_name": "kafka:type=kafka.KafkaProducerStats",
- "attributes": "AvgProduceRequestMs,MaxProduceRequestMs,ProduceRequestsPerSecond"
+ "graph_name": "ProduceRequestRateAndTime",
+ "y_label": "requests-per-sec,ms,ms",
+ "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProduceRequestRateAndTimeMs",
+ "attributes": "OneMinuteRate,Mean,99thPercentile"
+ },
+ {
+ "graph_name": "ProduceRequestSize",
+ "y_label": "bytes,bytes",
+ "bean_name": "kafka.producer:type=ProducerRequestStat,name=ProducerRequestSize",
+ "attributes": "Mean,99thPercentile"
}
]
},
@@ -152,10 +140,22 @@
"role": "console_consumer",
"graphs": [
{
- "graph_name": "SimpleConsumerRequestStats",
- "y_label": "simple-consumer-throughput,simple-consumer-throughput-bytes,simple-consumer-latency-ms",
- "bean_name": "kafka:type=kafka.SimpleConsumerStats",
- "attributes": "FetchRequestsPerSecond,ConsumerThroughput,AvgFetchRequestMs"
+ "graph_name": "FetchRequestRateAndTime",
+ "y_label": "requests-per-sec,ms,ms",
+ "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchRequestRateAndTimeMs",
+ "attributes": "OneMinuteRate,Mean,99thPercentile"
+ },
+ {
+ "graph_name": "FetchResponseSize",
+ "y_label": "bytes,bytes",
+ "bean_name": "kafka.consumer:type=FetchRequestAndResponseStat,name=FetchResponseSize",
+ "attributes": "Mean,99thPercentile"
+ },
+ {
+ "graph_name": "ConsumedMessageRate",
+ "y_label": "messages-per-sec",
+ "bean_name": "kafka.consumer:type=ConsumerTopicStat,name=AllTopicsMessagesPerSec",
+ "attributes": "OneMinuteRate"
}
]
},