You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2012/09/13 06:27:16 UTC
svn commit: r1384202 [1/2] - in /incubator/kafka/branches/0.8:
core/src/main/scala/kafka/api/ core/src/main/scala/kafka/cluster/
core/src/main/scala/kafka/consumer/ core/src/main/scala/kafka/javaapi/
core/src/main/scala/kafka/log/ core/src/main/scala/k...
Author: junrao
Date: Thu Sep 13 04:27:13 2012
New Revision: 1384202
URL: http://svn.apache.org/viewvc?rev=1384202&view=rev
Log:
Improve Kafka internal metrics; patched by Jun Rao; reviewed by Joel Koshy and Neha Narkhede; KAFKA-203
Removed:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/LogStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServerStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStatsMBean.scala
Modified:
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaController.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaRequestHandler.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaServer.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/ReplicaManager.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/RequestPurgatory.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Pool.scala
incubator/kafka/branches/0.8/core/src/main/scala/kafka/utils/Utils.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/integration/TopicMetadataTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/network/SocketServerTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/server/SimpleFetchTest.scala
incubator/kafka/branches/0.8/core/src/test/scala/unit/kafka/utils/TestUtils.scala
incubator/kafka/branches/0.8/system_test/metrics.json
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/FetchRequest.scala Thu Sep 13 04:27:13 2012
@@ -105,7 +105,7 @@ case class FetchRequest(versionId: Short
replicaId: Int = FetchRequest.DefaultReplicaId,
maxWait: Int = FetchRequest.DefaultMaxWait,
minBytes: Int = FetchRequest.DefaultMinBytes,
- offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.Fetch)) {
+ offsetInfo: Seq[OffsetDetail] ) extends RequestOrResponse(Some(RequestKeys.FetchKey)) {
// ensure that a topic "X" appears in at most one OffsetDetail
def validate() {
@@ -144,6 +144,8 @@ case class FetchRequest(versionId: Short
def sizeInBytes: Int = 2 + 4 + (2 + clientId.length()) + 4 + 4 + 4 + offsetInfo.foldLeft(4)(_ + _.sizeInBytes())
def numPartitions: Int = offsetInfo.foldLeft(0)(_ + _.offsets.size)
+
+ def isFromFollower(): Boolean = replicaId != FetchRequest.NonFollowerId
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/LeaderAndIsrRequest.scala Thu Sep 13 04:27:13 2012
@@ -94,7 +94,7 @@ case class LeaderAndIsrRequest (versionI
isInit: Boolean,
ackTimeoutMs: Int,
leaderAndISRInfos: Map[(String, Int), LeaderAndIsr])
- extends RequestOrResponse(Some(RequestKeys.LeaderAndISRRequest)) {
+ extends RequestOrResponse(Some(RequestKeys.LeaderAndIsrKey)) {
def this(isInit: Boolean, leaderAndISRInfos: Map[(String, Int), LeaderAndIsr]) = {
this(LeaderAndIsrRequest.CurrentVersion, LeaderAndIsrRequest.DefaultClientId, isInit, LeaderAndIsrRequest.DefaultAckTimeout, leaderAndISRInfos)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/OffsetRequest.scala Thu Sep 13 04:27:13 2012
@@ -45,7 +45,7 @@ case class OffsetRequest(versionId: Shor
topic: String,
partition: Int,
time: Long,
- maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.Offsets)) {
+ maxNumOffsets: Int) extends RequestOrResponse(Some(RequestKeys.OffsetsKey)) {
def this(topic: String, partition: Int, time: Long, maxNumOffsets: Int) =
this(OffsetRequest.CurrentVersion, OffsetRequest.DefaultClientId, topic, partition, time, maxNumOffsets)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/ProducerRequest.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ import kafka.utils._
object ProducerRequest {
val CurrentVersion: Short = 0
-
+
def readFrom(buffer: ByteBuffer): ProducerRequest = {
val versionId: Short = buffer.getShort
val correlationId: Int = buffer.getInt
@@ -58,7 +58,7 @@ case class ProducerRequest( versionId: S
clientId: String,
requiredAcks: Short,
ackTimeoutMs: Int,
- data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+ data: Array[TopicData] ) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
def this(correlationId: Int, clientId: String, requiredAcks: Short, ackTimeoutMs: Int, data: Array[TopicData]) =
this(ProducerRequest.CurrentVersion, correlationId, clientId, requiredAcks, ackTimeoutMs, data)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/RequestKeys.scala Thu Sep 13 04:27:13 2012
@@ -17,11 +17,36 @@
package kafka.api
+import kafka.common.KafkaException
+import java.nio.ByteBuffer
+
object RequestKeys {
- val Produce: Short = 0
- val Fetch: Short = 1
- val Offsets: Short = 2
- val TopicMetadata: Short = 3
- val LeaderAndISRRequest: Short = 4
- val StopReplicaRequest: Short = 5
+ val ProduceKey: Short = 0
+ val FetchKey: Short = 1
+ val OffsetsKey: Short = 2
+ val MetadataKey: Short = 3
+ val LeaderAndIsrKey: Short = 4
+ val StopReplicaKey: Short = 5
+
+ val keyToNameAndDeserializerMap: Map[Short, (String, (ByteBuffer) => RequestOrResponse)]=
+ Map( ProduceKey -> ("Produce", ProducerRequest.readFrom),
+ FetchKey -> ("Fetch", FetchRequest.readFrom),
+ OffsetsKey -> ("Offsets", OffsetRequest.readFrom),
+ MetadataKey -> ("Metadata", TopicMetadataRequest.readFrom),
+ LeaderAndIsrKey -> ("LeaderAndIsr", LeaderAndIsrRequest.readFrom),
+ StopReplicaKey -> ("StopReplica", StopReplicaRequest.readFrom) )
+
+ def nameForKey(key: Short): String = {
+ keyToNameAndDeserializerMap.get(key) match {
+ case Some(nameAndSerializer) => nameAndSerializer._1
+ case None => throw new KafkaException("Wrong request type %d".format(key))
+ }
+ }
+
+ def deserializerForKey(key: Short): (ByteBuffer) => RequestOrResponse = {
+ keyToNameAndDeserializerMap.get(key) match {
+ case Some(nameAndSerializer) => nameAndSerializer._2
+ case None => throw new KafkaException("Wrong request type %d".format(key))
+ }
+ }
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/StopReplicaRequest.scala Thu Sep 13 04:27:13 2012
@@ -45,7 +45,7 @@ case class StopReplicaRequest(versionId:
clientId: String,
ackTimeoutMs: Int,
stopReplicaSet: Set[(String, Int)])
- extends RequestOrResponse(Some(RequestKeys.StopReplicaRequest)) {
+ extends RequestOrResponse(Some(RequestKeys.StopReplicaKey)) {
def this(stopReplicaSet: Set[(String, Int)]) = {
this(StopReplicaRequest.CurrentVersion, StopReplicaRequest.DefaultClientId, StopReplicaRequest.DefaultAckTimeout, stopReplicaSet)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/api/TopicMetadataRequest.scala Thu Sep 13 04:27:13 2012
@@ -78,7 +78,7 @@ case class TopicMetadataRequest(val vers
val topics: Seq[String],
val detailedMetadata: DetailedMetadataRequest = NoSegmentMetadata,
val timestamp: Option[Long] = None, val count: Option[Int] = None)
- extends RequestOrResponse(Some(RequestKeys.TopicMetadata)){
+ extends RequestOrResponse(Some(RequestKeys.MetadataKey)){
def this(topics: Seq[String]) =
this(TopicMetadataRequest.CurrentVersion, TopicMetadataRequest.DefaultClientId, topics, NoSegmentMetadata, None, None)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Partition.scala Thu Sep 13 04:27:13 2012
@@ -22,6 +22,8 @@ import java.lang.Object
import kafka.api.LeaderAndIsr
import kafka.server.ReplicaManager
import kafka.common.ErrorMapping
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
/**
* Data structure that represents a topic partition. The leader maintains the AR, ISR, CUR, RAR
@@ -29,7 +31,7 @@ import kafka.common.ErrorMapping
class Partition(val topic: String,
val partitionId: Int,
time: Time,
- val replicaManager: ReplicaManager) extends Logging {
+ val replicaManager: ReplicaManager) extends Logging with KafkaMetricsGroup {
private val localBrokerId = replicaManager.config.brokerId
private val logManager = replicaManager.logManager
private val replicaFetcherManager = replicaManager.replicaFetcherManager
@@ -45,6 +47,20 @@ class Partition(val topic: String,
private def isReplicaLocal(replicaId: Int) : Boolean = (replicaId == localBrokerId)
+ newGauge(
+ topic + "-" + partitionId + "UnderReplicated",
+ new Gauge[Int] {
+ def value() = {
+ if (isUnderReplicated) 1 else 0
+ }
+ }
+ )
+
+ def isUnderReplicated(): Boolean = {
+ // TODO: need to pass in replication factor from controller
+ inSyncReplicas.size < replicaManager.config.defaultReplicationFactor
+ }
+
def getOrCreateReplica(replicaId: Int = localBrokerId): Replica = {
val replicaOpt = getReplica(replicaId)
replicaOpt match {
@@ -182,6 +198,7 @@ class Partition(val topic: String,
info("Expanding ISR for topic %s partition %d to %s".format(topic, partitionId, newInSyncReplicas.map(_.brokerId).mkString(",")))
// update ISR in ZK and cache
updateISR(newInSyncReplicas)
+ replicaManager.isrExpandRate.mark()
}
maybeIncrementLeaderHW(leaderReplica)
case None => // nothing to do if no longer leader
@@ -240,6 +257,7 @@ class Partition(val topic: String,
updateISR(newInSyncReplicas)
// we may need to increment high watermark since ISR could be down to 1
maybeIncrementLeaderHW(leaderReplica)
+ replicaManager.isrShrinkRate.mark()
}
case None => // do nothing if no longer leader
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/cluster/Replica.scala Thu Sep 13 04:27:13 2012
@@ -36,7 +36,7 @@ class Replica(val brokerId: Int,
val topic = partition.topic
val partitionId = partition.partitionId
- def logEndOffset_=(newLogEndOffset: Long) = {
+ def logEndOffset_=(newLogEndOffset: Long) {
if (!isLocal) {
logEndOffsetValue.set(newLogEndOffset)
logEndOffsetUpdateTimeMsValue.set(time.milliseconds)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerConnector.scala Thu Sep 13 04:27:13 2012
@@ -18,7 +18,7 @@
package kafka.consumer
import scala.collection._
-import kafka.utils.{Utils, Logging}
+import kafka.utils.Logging
import kafka.serializer.{DefaultDecoder, Decoder}
/**
@@ -64,8 +64,6 @@ trait ConsumerConnector {
}
object Consumer extends Logging {
- private val consumerStatsMBeanName = "kafka:type=kafka.ConsumerStats"
-
/**
* Create a ConsumerConnector
*
@@ -74,7 +72,6 @@ object Consumer extends Logging {
*/
def create(config: ConsumerConfig): ConsumerConnector = {
val consumerConnect = new ZookeeperConsumerConnector(config)
- Utils.registerMBean(consumerConnect, consumerStatsMBeanName)
consumerConnect
}
@@ -86,7 +83,6 @@ object Consumer extends Logging {
*/
def createJavaConsumerConnector(config: ConsumerConfig): kafka.javaapi.consumer.ConsumerConnector = {
val consumerConnect = new kafka.javaapi.consumer.ZookeeperConsumerConnector(config)
- Utils.registerMBean(consumerConnect.underlying, consumerStatsMBeanName)
consumerConnect
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerIterator.scala Thu Sep 13 04:27:13 2012
@@ -47,8 +47,8 @@ class ConsumerIterator[T](private val ch
currentTopicInfo.resetConsumeOffset(consumedOffset)
val topic = currentTopicInfo.topic
trace("Setting %s consumed offset to %d".format(topic, consumedOffset))
- ConsumerTopicStat.getConsumerTopicStat(topic).recordMessagesPerTopic(1)
- ConsumerTopicStat.getConsumerAllTopicStat().recordMessagesPerTopic(1)
+ ConsumerTopicStat.getConsumerTopicStat(topic).messageRate.mark()
+ ConsumerTopicStat.getConsumerAllTopicStat().messageRate.mark()
item
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ConsumerTopicStat.scala Thu Sep 13 04:27:13 2012
@@ -17,44 +17,24 @@
package kafka.consumer
-import java.util.concurrent.atomic.AtomicLong
-import kafka.utils.{Pool, Utils, threadsafe, Logging}
-
-trait ConsumerTopicStatMBean {
- def getMessagesPerTopic: Long
- def getBytesPerTopic: Long
-}
+import kafka.utils.{Pool, threadsafe, Logging}
+import java.util.concurrent.TimeUnit
+import kafka.metrics.KafkaMetricsGroup
@threadsafe
-class ConsumerTopicStat extends ConsumerTopicStatMBean {
- private val numCumulatedMessagesPerTopic = new AtomicLong(0)
- private val numCumulatedBytesPerTopic = new AtomicLong(0)
-
- def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
-
- def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
-
- def getBytesPerTopic: Long = numCumulatedBytesPerTopic.get
-
- def recordBytesPerTopic(nBytes: Long) = numCumulatedBytesPerTopic.getAndAdd(nBytes)
+class ConsumerTopicStat(name: String) extends KafkaMetricsGroup {
+ val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
}
object ConsumerTopicStat extends Logging {
- private val stats = new Pool[String, ConsumerTopicStat]
- private val allTopicStat = new ConsumerTopicStat
- Utils.registerMBean(allTopicStat, "kafka:type=kafka.ConsumerAllTopicStat")
+ private val valueFactory = (k: String) => new ConsumerTopicStat(k)
+ private val stats = new Pool[String, ConsumerTopicStat](Some(valueFactory))
+ private val allTopicStat = new ConsumerTopicStat("AllTopics")
def getConsumerAllTopicStat(): ConsumerTopicStat = allTopicStat
def getConsumerTopicStat(topic: String): ConsumerTopicStat = {
- var stat = stats.get(topic)
- if (stat == null) {
- stat = new ConsumerTopicStat
- if (stats.putIfNotExists(topic, stat) == null)
- Utils.registerMBean(stat, "kafka:type=kafka.ConsumerTopicStat." + topic)
- else
- stat = stats.get(topic)
- }
- return stat
+ stats.getAndMaybePut(topic + "-")
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/PartitionTopicInfo.scala Thu Sep 13 04:27:13 2012
@@ -21,7 +21,6 @@ import java.util.concurrent._
import java.util.concurrent.atomic._
import kafka.message._
import kafka.utils.Logging
-import kafka.common.ErrorMapping
private[consumer] class PartitionTopicInfo(val topic: String,
val brokerId: Int,
@@ -59,8 +58,8 @@ private[consumer] class PartitionTopicIn
chunkQueue.put(new FetchedDataChunk(messages, this, fetchedOffset.get))
val newOffset = fetchedOffset.addAndGet(size)
debug("updated fetch offset of ( %s ) to %d".format(this, newOffset))
- ConsumerTopicStat.getConsumerTopicStat(topic).recordBytesPerTopic(size)
- ConsumerTopicStat.getConsumerAllTopicStat().recordBytesPerTopic(size)
+ ConsumerTopicStat.getConsumerTopicStat(topic).byteRate.mark(size)
+ ConsumerTopicStat.getConsumerAllTopicStat().byteRate.mark(size)
}
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/SimpleConsumer.scala Thu Sep 13 04:27:13 2012
@@ -21,6 +21,8 @@ import kafka.api._
import kafka.network._
import kafka.utils._
import kafka.common.ErrorMapping
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
/**
* A consumer of kafka messages
@@ -91,15 +93,13 @@ class SimpleConsumer( val host: String,
* @return a set of fetched messages
*/
def fetch(request: FetchRequest): FetchResponse = {
- val startTime = SystemTime.nanoseconds
- val response = sendRequest(request)
+ var response: Receive = null
+ FetchRequestAndResponseStat.requestTimer.time {
+ response = sendRequest(request)
+ }
val fetchResponse = FetchResponse.readFrom(response.buffer)
val fetchedSize = fetchResponse.sizeInBytes
-
- val endTime = SystemTime.nanoseconds
- SimpleConsumerStats.recordFetchRequest(endTime - startTime)
- SimpleConsumerStats.recordConsumptionThroughput(fetchedSize)
-
+ FetchRequestAndResponseStat.respondSizeHist.update(fetchedSize)
fetchResponse
}
@@ -125,39 +125,7 @@ class SimpleConsumer( val host: String,
}
}
-trait SimpleConsumerStatsMBean {
- def getFetchRequestsPerSecond: Double
- def getAvgFetchRequestMs: Double
- def getMaxFetchRequestMs: Double
- def getNumFetchRequests: Long
- def getConsumerThroughput: Double
-}
-
-@threadsafe
-class SimpleConsumerStats(monitoringDurationNs: Long) extends SimpleConsumerStatsMBean {
- private val fetchRequestStats = new SnapshotStats(monitoringDurationNs)
-
- def recordFetchRequest(requestNs: Long) = fetchRequestStats.recordRequestMetric(requestNs)
-
- def recordConsumptionThroughput(data: Long) = fetchRequestStats.recordThroughputMetric(data)
-
- def getFetchRequestsPerSecond: Double = fetchRequestStats.getRequestsPerSecond
-
- def getAvgFetchRequestMs: Double = fetchRequestStats.getAvgMetric / (1000.0 * 1000.0)
-
- def getMaxFetchRequestMs: Double = fetchRequestStats.getMaxMetric / (1000.0 * 1000.0)
-
- def getNumFetchRequests: Long = fetchRequestStats.getNumRequests
-
- def getConsumerThroughput: Double = fetchRequestStats.getThroughput
-}
-
-object SimpleConsumerStats extends Logging {
- private val simpleConsumerstatsMBeanName = "kafka:type=kafka.SimpleConsumerStats"
- private val stats = new SimpleConsumerStats(1 * 1000L * 1000L * 1000L)
- Utils.registerMBean(stats, simpleConsumerstatsMBeanName)
-
- def recordFetchRequest(requestMs: Long) = stats.recordFetchRequest(requestMs)
- def recordConsumptionThroughput(data: Long) = stats.recordConsumptionThroughput(data)
-}
-
+object FetchRequestAndResponseStat extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer("FetchRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val respondSizeHist = newHistogram("FetchResponseSize")
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala Thu Sep 13 04:27:13 2012
@@ -32,6 +32,8 @@ import java.util.UUID
import kafka.serializer.Decoder
import kafka.utils.ZkUtils._
import kafka.common.{KafkaException, NoBrokersForPartitionException, ConsumerRebalanceFailedException, InvalidConfigException}
+import com.yammer.metrics.core.Gauge
+import kafka.metrics.KafkaMetricsGroup
/**
@@ -73,21 +75,9 @@ private[kafka] object ZookeeperConsumerC
val shutdownCommand: FetchedDataChunk = new FetchedDataChunk(null, null, -1L)
}
-/**
- * JMX interface for monitoring consumer
- */
-trait ZookeeperConsumerConnectorMBean {
- def getPartOwnerStats: String
- def getConsumerGroup: String
- def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long
- def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long
- def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long
-}
-
private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig,
val enableFetcher: Boolean) // for testing only
- extends ConsumerConnector with ZookeeperConsumerConnectorMBean
- with Logging {
+ extends ConsumerConnector with Logging with KafkaMetricsGroup {
private val isShuttingDown = new AtomicBoolean(false)
private val rebalanceLock = new Object
private var fetcher: Option[ConsumerFetcherManager] = None
@@ -260,58 +250,6 @@ private[kafka] class ZookeeperConsumerCo
}
}
- // for JMX
- def getPartOwnerStats(): String = {
- val builder = new StringBuilder
- for ((topic, infos) <- topicRegistry) {
- builder.append("\n" + topic + ": [")
- val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
- for(partition <- infos.values) {
- builder.append("\n {")
- builder.append{partition}
- builder.append(",fetch offset:" + partition.getFetchOffset)
- builder.append(",consumer offset:" + partition.getConsumeOffset)
- builder.append("}")
- }
- builder.append("\n ]")
- }
- builder.toString
- }
-
- // for JMX
- def getConsumerGroup(): String = config.groupId
-
- def getOffsetLag(topic: String, brokerId: Int, partitionId: Int): Long =
- getLatestOffset(topic, brokerId, partitionId) - getConsumedOffset(topic, brokerId, partitionId)
-
- def getConsumedOffset(topic: String, brokerId: Int, partitionId: Int): Long = {
- val partitionInfos = topicRegistry.get(topic)
- if (partitionInfos != null) {
- val partitionInfo = partitionInfos.get(partitionId)
- if (partitionInfo != null)
- return partitionInfo.getConsumeOffset
- }
-
- // otherwise, try to get it from zookeeper
- try {
- val topicDirs = new ZKGroupTopicDirs(config.groupId, topic)
- val znode = topicDirs.consumerOffsetDir + "/" + partitionId
- val offsetString = readDataMaybeNull(zkClient, znode)._1
- offsetString match {
- case Some(offset) => offset.toLong
- case None => -1L
- }
- }
- catch {
- case e =>
- error("error in getConsumedOffset JMX ", e)
- -2L
- }
- }
-
- def getLatestOffset(topic: String, brokerId: Int, partitionId: Int): Long =
- earliestOrLatestOffset(topic, brokerId, partitionId, OffsetRequest.LatestTime)
-
private def earliestOrLatestOffset(topic: String, brokerId: Int, partitionId: Int, earliestOrLatest: Long): Long = {
var simpleConsumer: SimpleConsumer = null
var producedOffset: Long = -1L
@@ -728,6 +666,12 @@ private[kafka] class ZookeeperConsumerCo
val topicThreadId = e._1
val q = e._2._1
topicThreadIdAndQueues.put(topicThreadId, q)
+ newGauge(
+ config.groupId + "-" + topicThreadId._1 + "-" + topicThreadId._2 + "-FetchQueueSize",
+ new Gauge[Int] {
+ def value() = q.size
+ }
+ )
})
val groupedByTopic = threadQueueStreamPairs.groupBy(_._1._1)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/javaapi/ProducerRequest.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ class ProducerRequest(val correlationId:
val clientId: String,
val requiredAcks: Short,
val ackTimeoutMs: Int,
- val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.Produce)) {
+ val data: Array[TopicData]) extends RequestOrResponse(Some(RequestKeys.ProduceKey)) {
val underlying = new kafka.api.ProducerRequest(correlationId, clientId, requiredAcks, ackTimeoutMs, data)
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/log/Log.scala Thu Sep 13 04:27:13 2012
@@ -26,6 +26,8 @@ import java.text.NumberFormat
import kafka.server.BrokerTopicStat
import kafka.message.{ByteBufferMessageSet, MessageSet, InvalidMessageException, FileMessageSet}
import kafka.common.{KafkaException, InvalidMessageSizeException, OffsetOutOfRangeException}
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
object Log {
val FileSuffix = ".kafka"
@@ -130,7 +132,7 @@ class LogSegment(val file: File, val mes
@threadsafe
private[kafka] class Log( val dir: File, val maxLogFileSize: Long, val maxMessageSize: Int, val flushInterval: Int,
val rollIntervalMs: Long, val needRecovery: Boolean, time: Time,
- brokerId: Int = 0) extends Logging {
+ brokerId: Int = 0) extends Logging with KafkaMetricsGroup {
this.logIdent = "[Kafka Log on Broker " + brokerId + "], "
import kafka.log.Log._
@@ -147,9 +149,19 @@ private[kafka] class Log( val dir: File,
/* The actual segments of the log */
private[log] val segments: SegmentList[LogSegment] = loadSegments()
- private val logStats = new LogStats(this)
-
- Utils.registerMBean(logStats, "kafka:type=kafka.logs." + dir.getName)
+ newGauge(
+ name + "-" + "NumLogSegments",
+ new Gauge[Int] {
+ def value() = numberOfSegments
+ }
+ )
+
+ newGauge(
+ name + "-" + "LogEndOffset",
+ new Gauge[Long] {
+ def value() = logEndOffset
+ }
+ )
/* The name of this log */
def name = dir.getName()
@@ -243,9 +255,8 @@ private[kafka] class Log( val dir: File,
numberOfMessages += 1;
}
- BrokerTopicStat.getBrokerTopicStat(topicName).recordMessagesIn(numberOfMessages)
- BrokerTopicStat.getBrokerAllTopicStat.recordMessagesIn(numberOfMessages)
- logStats.recordAppendedMessages(numberOfMessages)
+ BrokerTopicStat.getBrokerTopicStat(topicName).messagesInRate.mark(numberOfMessages)
+ BrokerTopicStat.getBrokerAllTopicStat.messagesInRate.mark(numberOfMessages)
// truncate the message set's buffer upto validbytes, before appending it to the on-disk log
val validByteBuffer = messages.buffer.duplicate()
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/message/FileMessageSet.scala Thu Sep 13 04:27:13 2012
@@ -24,6 +24,8 @@ import java.util.concurrent.atomic._
import kafka.utils._
import kafka.common.KafkaException
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
/**
* An on-disk message set. The set can be opened either mutably or immutably. Mutation attempts
@@ -157,10 +159,9 @@ class FileMessageSet private[kafka](priv
*/
def flush() = {
checkMutable()
- val startTime = SystemTime.milliseconds
- channel.force(true)
- val elapsedTime = SystemTime.milliseconds - startTime
- LogFlushStats.recordFlushRequest(elapsedTime)
+ LogFlushStats.logFlushTimer.time {
+ channel.force(true)
+ }
}
/**
@@ -238,38 +239,8 @@ class FileMessageSet private[kafka](priv
else
next
}
-
-}
-
-trait LogFlushStatsMBean {
- def getFlushesPerSecond: Double
- def getAvgFlushMs: Double
- def getTotalFlushMs: Long
- def getMaxFlushMs: Double
- def getNumFlushes: Long
}
-@threadsafe
-class LogFlushStats(monitorDurationNs: Long) extends LogFlushStatsMBean {
- private val flushRequestStats = new SnapshotStats(monitorDurationNs)
-
- def recordFlushRequest(requestMs: Long) = flushRequestStats.recordRequestMetric(requestMs)
-
- def getFlushesPerSecond: Double = flushRequestStats.getRequestsPerSecond
-
- def getAvgFlushMs: Double = flushRequestStats.getAvgMetric
-
- def getTotalFlushMs: Long = flushRequestStats.getTotalMetric
-
- def getMaxFlushMs: Double = flushRequestStats.getMaxMetric
-
- def getNumFlushes: Long = flushRequestStats.getNumRequests
-}
-
-object LogFlushStats extends Logging {
- private val LogFlushStatsMBeanName = "kafka:type=kafka.LogFlushStats"
- private val stats = new LogFlushStats(1L * 1000 * 1000 * 1000)
- Utils.registerMBean(stats, LogFlushStatsMBeanName)
-
- def recordFlushRequest(requestMs: Long) = stats.recordFlushRequest(requestMs)
+object LogFlushStats extends KafkaMetricsGroup {
+ val logFlushTimer = new KafkaTimer(newTimer("LogFlushRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/metrics/KafkaMetricsGroup.scala Thu Sep 13 04:27:13 2012
@@ -27,38 +27,14 @@ import com.yammer.metrics.Metrics
trait KafkaMetricsGroup extends Logging {
/**
- * This method enables the user to form logical sub-groups of this
- * KafkaMetricsGroup by inserting a sub-group identifier in the package
- * string.
- *
- * @return The sub-group identifier.
- */
- def metricsGroupIdent: String = ""
-
- /**
* Creates a new MetricName object for gauges, meters, etc. created for this
- * metrics group. It uses the metricsGroupIdent to create logical sub-groups.
- * This is currently specifically of use to classes under kafka, with
- * broker-id being the most common metrics grouping strategy.
- *
+ * metrics group.
* @param name Descriptive name of the metric.
* @return Sanitized metric name object.
*/
private def metricName(name: String) = {
- val ident = metricsGroupIdent
val klass = this.getClass
- val pkg = {
- val actualPkg = if (klass.getPackage == null) "" else klass.getPackage.getName
- if (ident.nonEmpty) {
- // insert the sub-group identifier after the top-level package
- if (actualPkg.contains("."))
- actualPkg.replaceFirst("""\.""", ".%s.".format(ident))
- else
- actualPkg + "." + ident
- }
- else
- actualPkg
- }
+ val pkg = if (klass.getPackage == null) "" else klass.getPackage.getName
val simpleName = klass.getSimpleName.replaceAll("\\$$", "")
new MetricName(pkg, simpleName, name)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/RequestChannel.scala Thu Sep 13 04:27:13 2012
@@ -19,23 +19,78 @@ package kafka.network
import java.util.concurrent._
import kafka.utils.SystemTime
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.nio.ByteBuffer
+import kafka.api._
+
+object RequestChannel {
+ val AllDone = new Request(1, 2, getShutdownReceive(), 0)
+
+ def getShutdownReceive() = {
+ val emptyProducerRequest = new ProducerRequest(0, 0, "", 0, 0, Array[TopicData]())
+ val byteBuffer = ByteBuffer.allocate(emptyProducerRequest.sizeInBytes + 2)
+ byteBuffer.putShort(RequestKeys.ProduceKey)
+ emptyProducerRequest.writeTo(byteBuffer)
+ byteBuffer.rewind()
+ byteBuffer
+ }
+
+ case class Request(processor: Int, requestKey: Any, buffer: ByteBuffer, startTimeNs: Long) {
+ var dequeueTimeNs = -1L
+ var apiLocalCompleteTimeNs = -1L
+ var responseCompleteTimeNs = -1L
+ val requestId = buffer.getShort()
+ val requestObj: RequestOrResponse = RequestKeys.deserializerForKey(requestId)(buffer)
+ buffer.rewind()
+
+ def updateRequestMetrics() {
+ val endTimeNs = SystemTime.nanoseconds
+ val queueTime = (dequeueTimeNs - startTimeNs).max(0L)
+ val apiLocalTime = (apiLocalCompleteTimeNs - dequeueTimeNs).max(0L)
+ val apiRemoteTime = (responseCompleteTimeNs - apiLocalCompleteTimeNs).max(0L)
+ val responseSendTime = (endTimeNs - responseCompleteTimeNs).max(0L)
+ val totalTime = endTimeNs - startTimeNs
+ var metricsList = List(RequestMetrics.metricsMap(RequestKeys.nameForKey(requestId)))
+ if (requestId == RequestKeys.FetchKey) {
+ val isFromFollower = requestObj.asInstanceOf[FetchRequest].isFromFollower
+ metricsList ::= ( if (isFromFollower)
+ RequestMetrics.metricsMap(RequestMetrics.followFetchMetricName)
+ else
+ RequestMetrics.metricsMap(RequestMetrics.consumerFetchMetricName) )
+ }
+ metricsList.foreach{
+ m => m.requestRate.mark()
+ m.queueTimeHist.update(queueTime)
+ m.localTimeHist.update(apiLocalTime)
+ m.remoteTimeHist.update(apiRemoteTime)
+ m.responseSendTimeHist.update(responseSendTime)
+ m.totalTimeHist.update(totalTime)
+ }
+ }
+ }
+
+ case class Response(processor: Int, request: Request, responseSend: Send) {
+ request.responseCompleteTimeNs = SystemTime.nanoseconds
-object RequestChannel {
- val AllDone = new Request(1, 2, null, 0)
- case class Request(processor: Int, requestKey: Any, request: Receive, start: Long)
- case class Response(processor: Int, requestKey: Any, response: Send, start: Long, elapsedNs: Long) {
def this(request: Request, send: Send) =
- this(request.processor, request.requestKey, send, request.start, SystemTime.nanoseconds - request.start)
+ this(request.processor, request, send)
}
}
-class RequestChannel(val numProcessors: Int, val queueSize: Int) {
+class RequestChannel(val numProcessors: Int, val queueSize: Int) extends KafkaMetricsGroup {
private var responseListeners: List[(Int) => Unit] = Nil
private val requestQueue = new ArrayBlockingQueue[RequestChannel.Request](queueSize)
private val responseQueues = new Array[BlockingQueue[RequestChannel.Response]](numProcessors)
for(i <- 0 until numProcessors)
responseQueues(i) = new ArrayBlockingQueue[RequestChannel.Response](queueSize)
-
+
+ newGauge(
+ "RequestQueueSize",
+ new Gauge[Int] {
+ def value() = requestQueue.size
+ }
+ )
/** Send a request to be handled, potentially blocking until there is room in the queue for the request */
def sendRequest(request: RequestChannel.Request) {
@@ -60,5 +115,26 @@ class RequestChannel(val numProcessors:
def addResponseListener(onResponse: Int => Unit) {
responseListeners ::= onResponse
}
+}
+
+object RequestMetrics {
+ val metricsMap = new scala.collection.mutable.HashMap[String, RequestMetrics]
+ val consumerFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Consumer"
+ val followFetchMetricName = RequestKeys.nameForKey(RequestKeys.FetchKey) + "-Follower"
+ (RequestKeys.keyToNameAndDeserializerMap.values.map(e => e._1)
+ ++ List(consumerFetchMetricName, followFetchMetricName)).foreach(name => metricsMap.put(name, new RequestMetrics(name)))
+}
+class RequestMetrics(name: String) extends KafkaMetricsGroup {
+ val requestRate = newMeter(name + "-RequestsPerSec", "requests", TimeUnit.SECONDS)
+ // time a request spent in a request queue
+ val queueTimeHist = newHistogram(name + "-QueueTimeNs")
+ // time a request takes to be processed at the local broker
+ val localTimeHist = newHistogram(name + "-LocalTimeNs")
+ // time a request takes to wait on remote brokers (only relevant to fetch and produce requests)
+ val remoteTimeHist = newHistogram(name + "-RemoteTimeNs")
+ // time to send the response to the requester
+ val responseSendTimeHist = newHistogram(name + "-ResponseSendTimeNs")
+ val totalTimeHist = newHistogram(name + "-TotalTimeNs")
}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/network/SocketServer.scala Thu Sep 13 04:27:13 2012
@@ -41,7 +41,6 @@ class SocketServer(val brokerId: Int,
private val time = SystemTime
private val processors = new Array[Processor](numProcessorThreads)
private var acceptor: Acceptor = new Acceptor(port, processors)
- val stats: SocketServerStats = new SocketServerStats(1000L * 1000L * 1000L * monitoringPeriodSecs)
val requestChannel = new RequestChannel(numProcessorThreads, maxQueuedRequests)
/**
@@ -49,7 +48,7 @@ class SocketServer(val brokerId: Int,
*/
def startup() {
for(i <- 0 until numProcessorThreads) {
- processors(i) = new Processor(i, time, maxRequestSize, requestChannel, stats)
+ processors(i) = new Processor(i, time, maxRequestSize, requestChannel)
Utils.newThread("kafka-processor-%d-%d".format(port, i), processors(i), false).start()
}
// register the processor threads for notification of responses
@@ -187,8 +186,7 @@ private[kafka] class Acceptor(val port:
private[kafka] class Processor(val id: Int,
val time: Time,
val maxRequestSize: Int,
- val requestChannel: RequestChannel,
- val stats: SocketServerStats) extends AbstractServerThread {
+ val requestChannel: RequestChannel) extends AbstractServerThread {
private val newConnections = new ConcurrentLinkedQueue[SocketChannel]();
@@ -240,10 +238,10 @@ private[kafka] class Processor(val id: I
var curr = requestChannel.receiveResponse(id)
while(curr != null) {
trace("Socket server received response to send, registering for write: " + curr)
- val key = curr.requestKey.asInstanceOf[SelectionKey]
+ val key = curr.request.requestKey.asInstanceOf[SelectionKey]
try {
key.interestOps(SelectionKey.OP_WRITE)
- key.attach(curr.response)
+ key.attach(curr)
} catch {
case e: CancelledKeyException => {
debug("Ignoring response for closed socket.")
@@ -288,18 +286,17 @@ private[kafka] class Processor(val id: I
*/
def read(key: SelectionKey) {
val socketChannel = channelFor(key)
- var request = key.attachment.asInstanceOf[Receive]
+ var receive = key.attachment.asInstanceOf[Receive]
if(key.attachment == null) {
- request = new BoundedByteBufferReceive(maxRequestSize)
- key.attach(request)
+ receive = new BoundedByteBufferReceive(maxRequestSize)
+ key.attach(receive)
}
- val read = request.readFrom(socketChannel)
- stats.recordBytesRead(read)
+ val read = receive.readFrom(socketChannel)
trace(read + " bytes read from " + socketChannel.socket.getRemoteSocketAddress())
if(read < 0) {
close(key)
- } else if(request.complete) {
- val req = RequestChannel.Request(processor = id, requestKey = key, request = request, start = time.nanoseconds)
+ } else if(receive.complete) {
+ val req = RequestChannel.Request(processor = id, requestKey = key, buffer = receive.buffer, startTimeNs = time.nanoseconds)
requestChannel.sendRequest(req)
trace("Recieved request, sending for processing by handler: " + req)
key.attach(null)
@@ -315,13 +312,14 @@ private[kafka] class Processor(val id: I
*/
def write(key: SelectionKey) {
val socketChannel = channelFor(key)
- var response = key.attachment().asInstanceOf[Send]
- if(response == null)
+ val response = key.attachment().asInstanceOf[RequestChannel.Response]
+ val responseSend = response.responseSend
+ if(responseSend == null)
throw new IllegalStateException("Registered for write interest but no response attached to key.")
- val written = response.writeTo(socketChannel)
- stats.recordBytesWritten(written)
+ val written = responseSend.writeTo(socketChannel)
trace(written + " bytes written to " + socketChannel.socket.getRemoteSocketAddress())
- if(response.complete) {
+ if(responseSend.complete) {
+ response.request.updateRequestMetrics()
key.attach(null)
key.interestOps(SelectionKey.OP_READ)
} else {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/Producer.scala Thu Sep 13 04:27:13 2012
@@ -20,8 +20,9 @@ import async.{AsyncProducerStats, Defaul
import kafka.utils._
import java.util.concurrent.{TimeUnit, LinkedBlockingQueue}
import kafka.serializer.Encoder
-import java.util.concurrent.atomic.{AtomicLong, AtomicBoolean}
+import java.util.concurrent.atomic.AtomicBoolean
import kafka.common.{QueueFullException, InvalidConfigException}
+import kafka.metrics.KafkaMetricsGroup
class Producer[K,V](config: ProducerConfig,
private val eventHandler: EventHandler[K,V]) // for testing only
@@ -68,8 +69,10 @@ extends Logging {
}
private def recordStats(producerData: ProducerData[K,V]*) {
- for (data <- producerData)
- ProducerTopicStat.getProducerTopicStat(data.getTopic).recordMessagesPerTopic(data.getData.size)
+ for (data <- producerData) {
+ ProducerTopicStat.getProducerTopicStat(data.getTopic).messageRate.mark(data.getData.size)
+ ProducerTopicStat.getProducerAllTopicStat.messageRate.mark(data.getData.size)
+ }
}
private def asyncSend(producerData: ProducerData[K,V]*) {
@@ -93,7 +96,7 @@ extends Logging {
}
}
if(!added) {
- AsyncProducerStats.recordDroppedEvents
+ AsyncProducerStats.droppedMessageRate.mark()
error("Event queue is full of unsent messages, could not send event: " + data.toString)
throw new QueueFullException("Event queue is full of unsent messages, could not send event: " + data.toString)
}else {
@@ -118,31 +121,27 @@ extends Logging {
}
}
-trait ProducerTopicStatMBean {
- def getMessagesPerTopic: Long
-}
-
@threadsafe
-class ProducerTopicStat extends ProducerTopicStatMBean {
- private val numCumulatedMessagesPerTopic = new AtomicLong(0)
-
- def getMessagesPerTopic: Long = numCumulatedMessagesPerTopic.get
-
- def recordMessagesPerTopic(nMessages: Int) = numCumulatedMessagesPerTopic.getAndAdd(nMessages)
+class ProducerTopicStat(name: String) extends KafkaMetricsGroup {
+ val messageRate = newMeter(name + "MessagesPerSec", "messages", TimeUnit.SECONDS)
+ val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
}
-object ProducerTopicStat extends Logging {
- private val stats = new Pool[String, ProducerTopicStat]
+object ProducerTopicStat {
+ private val valueFactory = (k: String) => new ProducerTopicStat(k)
+ private val stats = new Pool[String, ProducerTopicStat](Some(valueFactory))
+ private val allTopicStat = new ProducerTopicStat("AllTopics")
+
+ def getProducerAllTopicStat(): ProducerTopicStat = allTopicStat
def getProducerTopicStat(topic: String): ProducerTopicStat = {
- var stat = stats.get(topic)
- if (stat == null) {
- stat = new ProducerTopicStat
- if (stats.putIfNotExists(topic, stat) == null)
- Utils.registerMBean(stat, "kafka.producer.Producer:type=kafka.ProducerTopicStat." + topic)
- else
- stat = stats.get(topic)
- }
- return stat
+ stats.getAndMaybePut(topic + "-")
}
}
+
+object ProducerStats extends KafkaMetricsGroup {
+ val serializationErrorRate = newMeter("SerializationErrorsPerSec", "errors", TimeUnit.SECONDS)
+ val resendRate = newMeter( "ResendsPerSec", "resends", TimeUnit.SECONDS)
+ val failedSendRate = newMeter("FailedSendsPerSec", "failed sends", TimeUnit.SECONDS)
+}
+
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/SyncProducer.scala Thu Sep 13 04:27:13 2012
@@ -18,11 +18,12 @@
package kafka.producer
import kafka.api._
-import kafka.message.MessageSet
import kafka.network.{BlockingChannel, BoundedByteBufferSend, Receive}
import kafka.utils._
import java.util.Random
-import kafka.common.{ErrorMapping, MessageSizeTooLargeException}
+import kafka.common.ErrorMapping
+import java.util.concurrent.TimeUnit
+import kafka.metrics.{KafkaTimer, KafkaMetricsGroup}
object SyncProducer {
val RequestKey: Short = 0
@@ -57,7 +58,7 @@ class SyncProducer(val config: SyncProdu
val buffer = new BoundedByteBufferSend(request).buffer
trace("verifying sendbuffer of size " + buffer.limit)
val requestTypeId = buffer.getShort()
- if(requestTypeId == RequestKeys.Produce) {
+ if(requestTypeId == RequestKeys.ProduceKey) {
val request = ProducerRequest.readFrom(buffer)
trace(request.toString)
}
@@ -92,7 +93,6 @@ class SyncProducer(val config: SyncProdu
sentOnConnection = 0
lastConnectionTime = System.currentTimeMillis
}
- SyncProducerStats.recordProduceRequest(SystemTime.nanoseconds - startTime)
response
}
}
@@ -101,7 +101,11 @@ class SyncProducer(val config: SyncProdu
* Send a message
*/
def send(producerRequest: ProducerRequest): ProducerResponse = {
- val response = doSend(producerRequest)
+ ProducerRequestStat.requestSizeHist.update(producerRequest.sizeInBytes)
+ var response: Receive = null
+ ProducerRequestStat.requestTimer.time {
+ response = doSend(producerRequest)
+ }
ProducerResponse.readFrom(response.buffer)
}
@@ -171,34 +175,7 @@ class SyncProducer(val config: SyncProdu
}
}
-trait SyncProducerStatsMBean {
- def getProduceRequestsPerSecond: Double
- def getAvgProduceRequestMs: Double
- def getMaxProduceRequestMs: Double
- def getNumProduceRequests: Long
-}
-
-@threadsafe
-class SyncProducerStats(monitoringDurationNs: Long) extends SyncProducerStatsMBean {
- private val produceRequestStats = new SnapshotStats(monitoringDurationNs)
-
- def recordProduceRequest(requestNs: Long) = produceRequestStats.recordRequestMetric(requestNs)
-
- def getProduceRequestsPerSecond: Double = produceRequestStats.getRequestsPerSecond
-
- def getAvgProduceRequestMs: Double = produceRequestStats.getAvgMetric / (1000.0 * 1000.0)
-
- def getMaxProduceRequestMs: Double = produceRequestStats.getMaxMetric / (1000.0 * 1000.0)
-
- def getNumProduceRequests: Long = produceRequestStats.getNumRequests
-}
-
-object SyncProducerStats extends Logging {
- private val kafkaProducerstatsMBeanName = "kafka:type=kafka.KafkaProducerStats"
- private val stats = new SyncProducerStats(1L * 1000 * 1000 * 1000)
- swallow(Utils.registerMBean(stats, kafkaProducerstatsMBeanName))
-
- def recordProduceRequest(requestMs: Long) = {
- stats.recordProduceRequest(requestMs)
- }
-}
+object ProducerRequestStat extends KafkaMetricsGroup {
+ val requestTimer = new KafkaTimer(newTimer("ProduceRequestRateAndTimeMs", TimeUnit.MILLISECONDS, TimeUnit.SECONDS))
+ val requestSizeHist = newHistogram("ProducerRequestSize")
+}
\ No newline at end of file
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/AsyncProducerStats.scala Thu Sep 13 04:27:13 2012
@@ -17,22 +17,9 @@
package kafka.producer.async
-import java.util.concurrent.atomic.AtomicInteger
-import kafka.utils.Utils
+import kafka.metrics.KafkaMetricsGroup
+import java.util.concurrent.TimeUnit
-class AsyncProducerStats extends AsyncProducerStatsMBean {
- val droppedEvents = new AtomicInteger(0)
-
- def getAsyncProducerDroppedEvents: Int = droppedEvents.get
-
- def recordDroppedEvents = droppedEvents.getAndAdd(1)
-}
-
-object AsyncProducerStats {
- private val stats = new AsyncProducerStats
- val ProducerMBeanName = "kafka.producer.Producer:type=AsyncProducerStats"
-
- Utils.registerMBean(stats, ProducerMBeanName)
-
- def recordDroppedEvents = stats.recordDroppedEvents
+object AsyncProducerStats extends KafkaMetricsGroup {
+ val droppedMessageRate = newMeter("DroppedMessagesPerSec", "drops", TimeUnit.SECONDS)
}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/DefaultEventHandler.scala Thu Sep 13 04:27:13 2012
@@ -24,7 +24,7 @@ import kafka.serializer.Encoder
import kafka.utils.{Utils, Logging}
import scala.collection.Map
import scala.collection.mutable.{ListBuffer, HashMap}
-import kafka.api.{TopicMetadata, ProducerRequest, TopicData, PartitionData}
+import kafka.api._
class DefaultEventHandler[K,V](config: ProducerConfig,
@@ -33,6 +33,7 @@ class DefaultEventHandler[K,V](config: P
private val producerPool: ProducerPool,
private val topicPartitionInfos: HashMap[String, TopicMetadata] = new HashMap[String, TopicMetadata])
extends EventHandler[K,V] with Logging {
+ val isSync = ("sync" == config.producerType)
val brokerPartitionInfo = new BrokerPartitionInfo(config, producerPool, topicPartitionInfos)
@@ -41,6 +42,11 @@ class DefaultEventHandler[K,V](config: P
def handle(events: Seq[ProducerData[K,V]]) {
lock synchronized {
val serializedData = serialize(events)
+ serializedData.foreach{
+ pd => val dataSize = pd.data.foldLeft(0)(_ + _.payloadSize)
+ ProducerTopicStat.getProducerTopicStat(pd.topic).byteRate.mark(dataSize)
+ ProducerTopicStat.getProducerAllTopicStat.byteRate.mark(dataSize)
+ }
var outstandingProduceRequests = serializedData
var remainingRetries = config.producerRetries + 1
while (remainingRetries > 0 && outstandingProduceRequests.size > 0) {
@@ -51,9 +57,11 @@ class DefaultEventHandler[K,V](config: P
// get topics of the outstanding produce requests and refresh metadata for those
Utils.swallowError(brokerPartitionInfo.updateInfo(outstandingProduceRequests.map(_.getTopic)))
remainingRetries -= 1
+ ProducerStats.resendRate.mark()
}
}
if(outstandingProduceRequests.size > 0) {
+ ProducerStats.failedSendRate.mark()
error("Failed to send the following requests: " + outstandingProduceRequests)
throw new FailedToSendMessageException("Failed to send messages after " + config.producerRetries + " tries.", null)
}
@@ -90,7 +98,28 @@ class DefaultEventHandler[K,V](config: P
}
def serialize(events: Seq[ProducerData[K,V]]): Seq[ProducerData[K,Message]] = {
- events.map(e => new ProducerData[K,Message](e.getTopic, e.getKey, e.getData.map(m => encoder.toMessage(m))))
+ val serializedProducerData = new ListBuffer[ProducerData[K,Message]]
+ events.foreach {e =>
+ val serializedMessages = new ListBuffer[Message]
+ for (d <- e.getData) {
+ try {
+ serializedMessages += encoder.toMessage(d)
+ } catch {
+ case t =>
+ ProducerStats.serializationErrorRate.mark()
+ if (isSync)
+ throw t
+ else {
+ // currently, if in async mode, we just log the serialization error. We need to revisit
+ // this when doing kafka-496
+ error("Error serializing message " + t)
+ }
+ }
+ }
+ if (serializedMessages.size > 0)
+ serializedProducerData += new ProducerData[K,Message](e.getTopic, e.getKey, serializedMessages)
+ }
+ serializedProducerData
}
def partitionAndCollate(events: Seq[ProducerData[K,Message]]): Option[Map[Int, Map[(String, Int), Seq[ProducerData[K,Message]]]]] = {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/producer/async/ProducerSendThread.scala Thu Sep 13 04:27:13 2012
@@ -21,16 +21,25 @@ import kafka.utils.{SystemTime, Logging}
import java.util.concurrent.{TimeUnit, CountDownLatch, BlockingQueue}
import collection.mutable.ListBuffer
import kafka.producer.ProducerData
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
class ProducerSendThread[K,V](val threadName: String,
val queue: BlockingQueue[ProducerData[K,V]],
val handler: EventHandler[K,V],
val queueTime: Long,
- val batchSize: Int) extends Thread(threadName) with Logging {
+ val batchSize: Int) extends Thread(threadName) with Logging with KafkaMetricsGroup {
private val shutdownLatch = new CountDownLatch(1)
private val shutdownCommand = new ProducerData[K,V](null, null.asInstanceOf[K], null.asInstanceOf[Seq[V]])
+ newGauge(
+ "ProducerQueueSize-" + getId,
+ new Gauge[Int] {
+ def value() = queue.size
+ }
+ )
+
override def run {
try {
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/AbstractFetcherThread.scala Thu Sep 13 04:27:13 2012
@@ -23,7 +23,11 @@ import kafka.common.ErrorMapping
import collection.mutable
import kafka.message.ByteBufferMessageSet
import kafka.api.{FetchResponse, PartitionData, FetchRequestBuilder}
-import kafka.utils.ShutdownableThread
+import kafka.metrics.KafkaMetricsGroup
+import com.yammer.metrics.core.Gauge
+import java.util.concurrent.atomic.AtomicLong
+import kafka.utils.{Pool, ShutdownableThread}
+import java.util.concurrent.TimeUnit
/**
@@ -35,6 +39,8 @@ abstract class AbstractFetcherThread(na
private val fetchMap = new mutable.HashMap[Tuple2[String,Int], Long] // a (topic, partitionId) -> offset map
private val fetchMapLock = new Object
val simpleConsumer = new SimpleConsumer(sourceBroker.host, sourceBroker.port, socketTimeout, socketBufferSize)
+ val fetcherMetrics = FetcherStat.getFetcherStat(name + "-" + sourceBroker.id)
+
// callbacks to be defined in subclass
// process fetched data
@@ -79,6 +85,7 @@ abstract class AbstractFetcherThread(na
}
}
}
+ fetcherMetrics.requestRate.mark()
if (response != null) {
// process fetched data
@@ -93,8 +100,11 @@ abstract class AbstractFetcherThread(na
partitionData.error match {
case ErrorMapping.NoError =>
processPartitionData(topic, currentOffset.get, partitionData)
- val newOffset = currentOffset.get + partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+ val validBytes = partitionData.messages.asInstanceOf[ByteBufferMessageSet].validBytes
+ val newOffset = currentOffset.get + validBytes
fetchMap.put(key, newOffset)
+ FetcherLagMetrics.getFetcherLagMetrics(topic, partitionId).lag = partitionData.hw - newOffset
+ fetcherMetrics.byteRate.mark(validBytes)
case ErrorMapping.OffsetOutOfRangeCode =>
val newOffset = handleOffsetOutOfRange(topic, partitionId)
fetchMap.put(key, newOffset)
@@ -140,4 +150,43 @@ abstract class AbstractFetcherThread(na
fetchMap.size
}
}
-}
\ No newline at end of file
+}
+
+class FetcherLagMetrics(name: (String, Int)) extends KafkaMetricsGroup {
+ private[this] var lagVal = new AtomicLong(-1L)
+ newGauge(
+ name._1 + "-" + name._2 + "-ConsumerLag",
+ new Gauge[Long] {
+ def value() = lagVal.get
+ }
+ )
+
+ def lag_=(newLag: Long) {
+ lagVal.set(newLag)
+ }
+
+ def lag = lagVal.get
+}
+
+object FetcherLagMetrics {
+ private val valueFactory = (k: (String, Int)) => new FetcherLagMetrics(k)
+ private val stats = new Pool[(String, Int), FetcherLagMetrics](Some(valueFactory))
+
+ def getFetcherLagMetrics(topic: String, partitionId: Int): FetcherLagMetrics = {
+ stats.getAndMaybePut( (topic, partitionId) )
+ }
+}
+
+class FetcherStat(name: String) extends KafkaMetricsGroup {
+ val requestRate = newMeter(name + "RequestsPerSec", "requests", TimeUnit.SECONDS)
+ val byteRate = newMeter(name + "BytesPerSec", "bytes", TimeUnit.SECONDS)
+}
+
+object FetcherStat {
+ private val valueFactory = (k: String) => new FetcherStat(k)
+ private val stats = new Pool[String, FetcherStat](Some(valueFactory))
+
+ def getFetcherStat(name: String): FetcherStat = {
+ stats.getAndMaybePut(name)
+ }
+}
Modified: incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala
URL: http://svn.apache.org/viewvc/incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala?rev=1384202&r1=1384201&r2=1384202&view=diff
==============================================================================
--- incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala (original)
+++ incubator/kafka/branches/0.8/core/src/main/scala/kafka/server/KafkaApis.scala Thu Sep 13 04:27:13 2012
@@ -40,8 +40,6 @@ class KafkaApis(val requestChannel: Requ
val replicaManager: ReplicaManager,
val zkClient: ZkClient,
brokerId: Int) extends Logging {
-
- private val metricsGroup = brokerId.toString
private val producerRequestPurgatory = new ProducerRequestPurgatory(brokerId)
private val fetchRequestPurgatory = new FetchRequestPurgatory(brokerId, requestChannel)
private val delayedRequestMetrics = new DelayedRequestMetrics
@@ -54,20 +52,20 @@ class KafkaApis(val requestChannel: Requ
* Top-level method that handles all requests and multiplexes to the right api
*/
def handle(request: RequestChannel.Request) {
- val apiId = request.request.buffer.getShort()
- apiId match {
- case RequestKeys.Produce => handleProducerRequest(request)
- case RequestKeys.Fetch => handleFetchRequest(request)
- case RequestKeys.Offsets => handleOffsetRequest(request)
- case RequestKeys.TopicMetadata => handleTopicMetadataRequest(request)
- case RequestKeys.LeaderAndISRRequest => handleLeaderAndISRRequest(request)
- case RequestKeys.StopReplicaRequest => handleStopReplicaRequest(request)
- case _ => throw new KafkaException("No mapping found for handler id " + apiId)
+ request.requestId match {
+ case RequestKeys.ProduceKey => handleProducerRequest(request)
+ case RequestKeys.FetchKey => handleFetchRequest(request)
+ case RequestKeys.OffsetsKey => handleOffsetRequest(request)
+ case RequestKeys.MetadataKey => handleTopicMetadataRequest(request)
+ case RequestKeys.LeaderAndIsrKey => handleLeaderAndISRRequest(request)
+ case RequestKeys.StopReplicaKey => handleStopReplicaRequest(request)
+ case requestId => throw new KafkaException("No mapping found for handler id " + requestId)
}
+ request.apiLocalCompleteTimeNs = SystemTime.nanoseconds
}
def handleLeaderAndISRRequest(request: RequestChannel.Request){
- val leaderAndISRRequest = LeaderAndIsrRequest.readFrom(request.request.buffer)
+ val leaderAndISRRequest = request.requestObj.asInstanceOf[LeaderAndIsrRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling leader and isr request " + leaderAndISRRequest)
trace("Handling leader and isr request " + leaderAndISRRequest)
@@ -79,7 +77,7 @@ class KafkaApis(val requestChannel: Requ
def handleStopReplicaRequest(request: RequestChannel.Request){
- val stopReplicaRequest = StopReplicaRequest.readFrom(request.request.buffer)
+ val stopReplicaRequest = request.requestObj.asInstanceOf[StopReplicaRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling stop replica request " + stopReplicaRequest)
trace("Handling stop replica request " + stopReplicaRequest)
@@ -107,11 +105,6 @@ class KafkaApis(val requestChannel: Requ
for(fetchReq <- satisfied) {
val topicData = readMessageSets(fetchReq.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, fetchReq.fetch.correlationId, topicData)
-
- val fromFollower = fetchReq.fetch.replicaId != FetchRequest.NonFollowerId
- delayedRequestMetrics.recordDelayedFetchSatisfied(
- fromFollower, SystemTime.nanoseconds - fetchReq.creationTimeNs, response)
-
requestChannel.sendResponse(new RequestChannel.Response(fetchReq.request, new FetchResponseSend(response)))
}
}
@@ -120,7 +113,7 @@ class KafkaApis(val requestChannel: Requ
* Handle a produce request
*/
def handleProducerRequest(request: RequestChannel.Request) {
- val produceRequest = ProducerRequest.readFrom(request.request.buffer)
+ val produceRequest = request.requestObj.asInstanceOf[ProducerRequest]
val sTime = SystemTime.milliseconds
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling producer request " + request.toString)
@@ -179,8 +172,8 @@ class KafkaApis(val requestChannel: Requ
for(topicData <- request.data) {
for(partitionData <- topicData.partitionDataArray) {
msgIndex += 1
- BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordBytesIn(partitionData.messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.recordBytesIn(partitionData.messages.sizeInBytes)
+ BrokerTopicStat.getBrokerTopicStat(topicData.topic).bytesInRate.mark(partitionData.messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.bytesInRate.mark(partitionData.messages.sizeInBytes)
try {
val localReplica = replicaManager.getLeaderReplicaIfLocal(topicData.topic, partitionData.partition)
val log = localReplica.log.get
@@ -193,8 +186,8 @@ class KafkaApis(val requestChannel: Requ
.format(partitionData.messages.sizeInBytes, offsets(msgIndex)))
} catch {
case e =>
- BrokerTopicStat.getBrokerTopicStat(topicData.topic).recordFailedProduceRequest
- BrokerTopicStat.getBrokerAllTopicStat.recordFailedProduceRequest
+ BrokerTopicStat.getBrokerTopicStat(topicData.topic).failedProduceRequestRate.mark()
+ BrokerTopicStat.getBrokerAllTopicStat.failedProduceRequestRate.mark()
error("Error processing ProducerRequest on %s:%d".format(topicData.topic, partitionData.partition), e)
e match {
case _: IOException =>
@@ -214,7 +207,7 @@ class KafkaApis(val requestChannel: Requ
* Handle a fetch request
*/
def handleFetchRequest(request: RequestChannel.Request) {
- val fetchRequest = FetchRequest.readFrom(request.request.buffer)
+ val fetchRequest = request.requestObj.asInstanceOf[FetchRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling fetch request " + fetchRequest.toString)
trace("Handling fetch request " + fetchRequest.toString)
@@ -229,7 +222,7 @@ class KafkaApis(val requestChannel: Requ
requestChannel.sendResponse(channelResponse)
}
- if(fetchRequest.replicaId != FetchRequest.NonFollowerId) {
+ if(fetchRequest.isFromFollower) {
maybeUpdatePartitionHW(fetchRequest)
// after updating HW, some delayed produce requests may be unblocked
var satisfiedProduceRequests = new mutable.ArrayBuffer[DelayedProduce]
@@ -272,7 +265,7 @@ class KafkaApis(val requestChannel: Requ
debug("Fetching log for topic %s partition %d".format(offsetDetail.topic, offsetDetail.partitions(i)))
try {
val leader = replicaManager.getLeaderReplicaIfLocal(offsetDetail.topic, offsetDetail.partitions(i))
- val end = if(fetchRequest.replicaId == FetchRequest.NonFollowerId) {
+ val end = if (!fetchRequest.isFromFollower) {
leader.highWatermark
} else {
leader.logEndOffset
@@ -317,12 +310,12 @@ class KafkaApis(val requestChannel: Requ
val topic = offsetDetail.topic
val (partitions, offsets, fetchSizes) = (offsetDetail.partitions, offsetDetail.offsets, offsetDetail.fetchSizes)
for( (partition, offset, fetchSize) <- (partitions, offsets, fetchSizes).zipped.map((_,_,_)) ) {
- val isFetchFromFollower = fetchRequest.replicaId != FetchRequest.NonFollowerId
+ val isFetchFromFollower = fetchRequest.isFromFollower()
val partitionInfo =
try {
val (messages, highWatermark) = readMessageSet(topic, partition, offset, fetchSize, isFetchFromFollower)
- BrokerTopicStat.getBrokerTopicStat(topic).recordBytesOut(messages.sizeInBytes)
- BrokerTopicStat.getBrokerAllTopicStat.recordBytesOut(messages.sizeInBytes)
+ BrokerTopicStat.getBrokerTopicStat(topic).bytesOutRate.mark(messages.sizeInBytes)
+ BrokerTopicStat.getBrokerAllTopicStat.bytesOutRate.mark(messages.sizeInBytes)
if (!isFetchFromFollower) {
new PartitionData(partition, ErrorMapping.NoError, offset, highWatermark, messages)
} else {
@@ -335,8 +328,8 @@ class KafkaApis(val requestChannel: Requ
}
catch {
case e =>
- BrokerTopicStat.getBrokerTopicStat(topic).recordFailedFetchRequest
- BrokerTopicStat.getBrokerAllTopicStat.recordFailedFetchRequest
+ BrokerTopicStat.getBrokerTopicStat(topic).failedFetchRequestRate.mark()
+ BrokerTopicStat.getBrokerAllTopicStat.failedFetchRequestRate.mark()
error("error when processing request " + (topic, partition, offset, fetchSize), e)
new PartitionData(partition, ErrorMapping.codeFor(e.getClass.asInstanceOf[Class[Throwable]]),
offset, -1L, MessageSet.Empty)
@@ -375,7 +368,7 @@ class KafkaApis(val requestChannel: Requ
* Service the offset request API
*/
def handleOffsetRequest(request: RequestChannel.Request) {
- val offsetRequest = OffsetRequest.readFrom(request.request.buffer)
+ val offsetRequest = request.requestObj.asInstanceOf[OffsetRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling offset request " + offsetRequest.toString)
trace("Handling offset request " + offsetRequest.toString)
@@ -402,7 +395,7 @@ class KafkaApis(val requestChannel: Requ
* Service the topic metadata request API
*/
def handleTopicMetadataRequest(request: RequestChannel.Request) {
- val metadataRequest = TopicMetadataRequest.readFrom(request.request.buffer)
+ val metadataRequest = request.requestObj.asInstanceOf[TopicMetadataRequest]
if(requestLogger.isTraceEnabled)
requestLogger.trace("Handling topic metadata request " + metadataRequest.toString())
trace("Handling topic metadata request " + metadataRequest.toString())
@@ -456,7 +449,7 @@ class KafkaApis(val requestChannel: Requ
def keyLabel: String
}
private [kafka] object MetricKey {
- val globalLabel = "all"
+ val globalLabel = "All"
}
private [kafka] case class RequestKey(topic: String, partition: Int)
@@ -476,7 +469,6 @@ class KafkaApis(val requestChannel: Requ
this.logIdent = "[FetchRequestPurgatory-%d], ".format(brokerId)
-
/**
* A fetch request is satisfied when it has accumulated enough data to meet the min_bytes field
*/
@@ -489,8 +481,8 @@ class KafkaApis(val requestChannel: Requ
def expire(delayed: DelayedFetch) {
val topicData = readMessageSets(delayed.fetch)
val response = new FetchResponse(FetchRequest.CurrentVersion, delayed.fetch.correlationId, topicData)
- val fromFollower = delayed.fetch.replicaId != FetchRequest.NonFollowerId
- delayedRequestMetrics.recordDelayedFetchExpired(fromFollower, response)
+ val fromFollower = delayed.fetch.isFromFollower
+ delayedRequestMetrics.recordDelayedFetchExpired(fromFollower)
requestChannel.sendResponse(new RequestChannel.Response(delayed.request, new FetchResponseSend(response)))
}
}
@@ -560,7 +552,6 @@ class KafkaApis(val requestChannel: Requ
val partitionId = followerFetchRequestKey.partition
val key = RequestKey(topic, partitionId)
val fetchPartitionStatus = partitionStatus(key)
- val durationNs = SystemTime.nanoseconds - creationTimeNs
trace("Checking producer request satisfaction for %s-%d, acksPending = %b"
.format(topic, partitionId, fetchPartitionStatus.acksPending))
if (fetchPartitionStatus.acksPending) {
@@ -576,17 +567,12 @@ class KafkaApis(val requestChannel: Requ
if (!fetchPartitionStatus.acksPending) {
val topicData = produce.data.find(_.topic == topic).get
val partitionData = topicData.partitionDataArray.find(_.partition == partitionId).get
- delayedRequestMetrics.recordDelayedProducerKeyCaughtUp(key,
- durationNs,
- partitionData.sizeInBytes)
maybeUnblockDelayedFetchRequests(topic, Array(partitionData))
}
}
// unblocked if there are no partitions with pending acks
val satisfied = ! partitionStatus.exists(p => p._2.acksPending)
- if (satisfied)
- delayedRequestMetrics.recordDelayedProduceSatisfied(durationNs)
satisfied
}
@@ -629,53 +615,18 @@ class KafkaApis(val requestChannel: Requ
private class DelayedRequestMetrics {
private class DelayedProducerRequestMetrics(keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
- val caughtUpFollowerFetchRequestMeter =
- newMeter("CaughtUpFollowerFetchRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
- val followerCatchUpTimeHistogram = if (keyLabel == MetricKey.globalLabel)
- Some(newHistogram("FollowerCatchUpTimeInNs", biased = true))
- else None
-
- /*
- * Note that throughput is updated on individual key satisfaction.
- * Therefore, it is an upper bound on throughput since the
- * DelayedProducerRequest may get expired.
- */
- val throughputMeter = newMeter("Throughput-" + keyLabel, "bytes", TimeUnit.SECONDS)
- val expiredRequestMeter = newMeter("ExpiredRequestsPerSecond-" + keyLabel, "requests", TimeUnit.SECONDS)
-
- val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
- Some(newMeter("SatisfiedRequestsPerSecond", "requests", TimeUnit.SECONDS))
- else None
- val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
- Some(newHistogram("SatisfactionTimeInNs", biased = true))
- else None
+ val expiredRequestMeter = newMeter(keyLabel + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
}
- private class DelayedFetchRequestMetrics(forFollower: Boolean,
- keyLabel: String = MetricKey.globalLabel) extends KafkaMetricsGroup {
- private val metricPrefix = if (forFollower) "Follower" else "NonFollower"
-
- val satisfiedRequestMeter = if (keyLabel == MetricKey.globalLabel)
- Some(newMeter(metricPrefix + "-SatisfiedRequestsPerSecond",
- "requests", TimeUnit.SECONDS))
- else None
-
- val satisfactionTimeHistogram = if (keyLabel == MetricKey.globalLabel)
- Some(newHistogram(metricPrefix + "-SatisfactionTimeInNs", biased = true))
- else None
-
- val expiredRequestMeter = if (keyLabel == MetricKey.globalLabel)
- Some(newMeter(metricPrefix + "-ExpiredRequestsPerSecond",
- "requests", TimeUnit.SECONDS))
- else None
+ private class DelayedFetchRequestMetrics(forFollower: Boolean) extends KafkaMetricsGroup {
+ private val metricPrefix = if (forFollower) "Follower" else "Consumer"
- val throughputMeter = newMeter("%s-Throughput-%s".format(metricPrefix, keyLabel),
- "bytes", TimeUnit.SECONDS)
+ val expiredRequestMeter = newMeter(metricPrefix + "ExpiresPerSecond", "requests", TimeUnit.SECONDS)
}
private val producerRequestMetricsForKey = {
- val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel)
+ val valueFactory = (k: MetricKey) => new DelayedProducerRequestMetrics(k.keyLabel + "-")
new Pool[MetricKey, DelayedProducerRequestMetrics](Some(valueFactory))
}
@@ -684,74 +635,16 @@ class KafkaApis(val requestChannel: Requ
private val aggregateFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = true)
private val aggregateNonFollowerFetchRequestMetrics = new DelayedFetchRequestMetrics(forFollower = false)
- private val followerFetchRequestMetricsForKey = {
- val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = true, k.keyLabel)
- new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
- }
-
- private val nonFollowerFetchRequestMetricsForKey = {
- val valueFactory = (k: MetricKey) => new DelayedFetchRequestMetrics(forFollower = false, k.keyLabel)
- new Pool[MetricKey, DelayedFetchRequestMetrics](Some(valueFactory))
- }
-
def recordDelayedProducerKeyExpired(key: MetricKey) {
val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
List(keyMetrics, aggregateProduceRequestMetrics).foreach(_.expiredRequestMeter.mark())
}
-
- def recordDelayedProducerKeyCaughtUp(key: MetricKey, timeToCatchUpNs: Long, bytes: Int) {
- val keyMetrics = producerRequestMetricsForKey.getAndMaybePut(key)
- List(keyMetrics, aggregateProduceRequestMetrics).foreach(m => {
- m.caughtUpFollowerFetchRequestMeter.mark()
- m.followerCatchUpTimeHistogram.foreach(_.update(timeToCatchUpNs))
- m.throughputMeter.mark(bytes)
- })
- }
-
-
- def recordDelayedProduceSatisfied(timeToSatisfyNs: Long) {
- aggregateProduceRequestMetrics.satisfiedRequestMeter.foreach(_.mark())
- aggregateProduceRequestMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
- }
-
- private def recordDelayedFetchThroughput(forFollower: Boolean, response: FetchResponse) {
- val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
- else aggregateNonFollowerFetchRequestMetrics
- metrics.throughputMeter.mark(response.sizeInBytes)
-
- response.topicMap.foreach(topicAndData => {
- val topic = topicAndData._1
- topicAndData._2.partitionDataArray.foreach(partitionData => {
- val key = RequestKey(topic, partitionData.partition)
- val keyMetrics = if (forFollower)
- followerFetchRequestMetricsForKey.getAndMaybePut(key)
- else
- nonFollowerFetchRequestMetricsForKey.getAndMaybePut(key)
- keyMetrics.throughputMeter.mark(partitionData.sizeInBytes)
- })
- })
- }
-
-
- def recordDelayedFetchExpired(forFollower: Boolean, response: FetchResponse) {
+ def recordDelayedFetchExpired(forFollower: Boolean) {
val metrics = if (forFollower) aggregateFollowerFetchRequestMetrics
else aggregateNonFollowerFetchRequestMetrics
- metrics.expiredRequestMeter.foreach(_.mark())
-
- recordDelayedFetchThroughput(forFollower, response)
- }
-
-
- def recordDelayedFetchSatisfied(forFollower: Boolean, timeToSatisfyNs: Long, response: FetchResponse) {
- val aggregateMetrics = if (forFollower) aggregateFollowerFetchRequestMetrics
- else aggregateNonFollowerFetchRequestMetrics
-
- aggregateMetrics.satisfactionTimeHistogram.foreach(_.update(timeToSatisfyNs))
- aggregateMetrics.satisfiedRequestMeter.foreach(_.mark())
-
- recordDelayedFetchThroughput(forFollower, response)
+ metrics.expiredRequestMeter.mark()
}
}
}