You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/02/25 19:59:38 UTC

kafka git commit: KAFKA-1953; KAFKA-1962; Disambiguate purgatory metrics; restore delayed request metrics; reviewed by Guozhang Wang

Repository: kafka
Updated Branches:
  refs/heads/trunk 10311c138 -> b8904e961


KAFKA-1953; KAFKA-1962; Disambiguate purgatory metrics; restore delayed request metrics; reviewed by Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8904e96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8904e96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8904e96

Branch: refs/heads/trunk
Commit: b8904e9614b4c1f11b8487e7a4b88b1e37e1f20b
Parents: 10311c1
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed Feb 25 10:59:28 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Feb 25 10:59:28 2015 -0800

----------------------------------------------------------------------
 .../kafka/coordinator/ConsumerCoordinator.scala |  6 ++--
 .../kafka/coordinator/DelayedHeartbeat.scala    |  4 +++
 .../kafka/coordinator/DelayedJoinGroup.scala    |  4 +++
 .../kafka/coordinator/DelayedRebalance.scala    |  4 +++
 .../main/scala/kafka/server/DelayedFetch.scala  | 20 ++++++++++++-
 .../scala/kafka/server/DelayedOperation.scala   | 22 ++++++++++----
 .../scala/kafka/server/DelayedProduce.scala     | 31 ++++++++++++++++++++
 .../scala/kafka/server/ReplicaManager.scala     |  9 ++++--
 .../kafka/server/DelayedOperationTest.scala     |  6 +++-
 9 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 01cf1d9..21790a5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -71,9 +71,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
     latestHeartbeatBucketEndMs = SystemTime.milliseconds
 
     // Initialize purgatories for delayed heartbeat, join-group and rebalance operations
-    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId)
-    joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId)
-    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId)
+    heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](purgatoryName = "Heartbeat", brokerId = config.brokerId)
+    joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](purgatoryName = "JoinGroup", brokerId = config.brokerId)
+    rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](purgatoryName = "Rebalance", brokerId = config.brokerId)
 
   }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 894d6ed..b1248e9 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -36,6 +36,10 @@ class DelayedHeartbeat(sessionTimeout: Long,
     throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket")
   }
 
+  override def onExpiration() {
+    // TODO
+  }
+
   /* mark all consumers within the heartbeat as heartbeat timed out */
   override def onComplete() {
     for (registry <- bucket.consumerRegistryList)

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
index 445bfa1..df60cbc 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
@@ -35,6 +35,10 @@ class DelayedJoinGroup(sessionTimeout: Long,
     forceComplete()
   }
 
+  override def onExpiration() {
+    // TODO
+  }
+
   /* always assume the partition is already assigned as this delayed operation should never time-out */
   override def onComplete() {
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
index b3b3749..8defa2e 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -50,6 +50,10 @@ class DelayedRebalance(sessionTimeout: Long,
       false
   }
 
+  override def onExpiration() {
+    // TODO
+  }
+
   /* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */
   override def onComplete() {
     groupRegistry.memberRegistries.values.foreach(consumerRegistry =>

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index dd602ee..de6cf5b 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -17,11 +17,14 @@
 
 package kafka.server
 
+import java.util.concurrent.TimeUnit
+
 import kafka.api.FetchResponsePartitionData
 import kafka.api.PartitionFetchInfo
 import kafka.common.UnknownTopicOrPartitionException
 import kafka.common.NotLeaderForPartitionException
 import kafka.common.TopicAndPartition
+import kafka.metrics.KafkaMetricsGroup
 
 import scala.collection._
 
@@ -37,6 +40,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf
 case class FetchMetadata(fetchMinBytes: Int,
                          fetchOnlyLeader: Boolean,
                          fetchOnlyCommitted: Boolean,
+                         isFromFollower: Boolean,
                          fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {
 
   override def toString = "[minBytes: " + fetchMinBytes + ", " +
@@ -109,6 +113,13 @@ class DelayedFetch(delayMs: Long,
       false
   }
 
+  override def onExpiration() {
+    if (fetchMetadata.isFromFollower)
+      DelayedFetchMetrics.followerExpiredRequestMeter.mark()
+    else
+      DelayedFetchMetrics.consumerExpiredRequestMeter.mark()
+  }
+
   /**
    * Upon completion, read whatever data is available and pass to the complete callback
    */
@@ -122,4 +133,11 @@ class DelayedFetch(delayMs: Long,
 
     responseCallback(fetchPartitionData)
   }
-}
\ No newline at end of file
+}
+
+object DelayedFetchMetrics extends KafkaMetricsGroup {
+  private val FetcherTypeKey = "fetcherType"
+  val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
+  val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 1d11099..e317676 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -71,6 +71,11 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
   def isCompleted(): Boolean = completed.get()
 
   /**
+   * Call-back to execute when a delayed operation expires, but before completion.
+   */
+  def onExpiration(): Unit
+
+  /**
    * Process for completing an operation; This function needs to be defined
    * in subclasses and will be called exactly once in forceComplete()
    */
@@ -89,7 +94,7 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
 /**
  * A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
  */
-class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000)
+class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
         extends Logging with KafkaMetricsGroup {
 
   /* a list of operation watching keys */
@@ -98,18 +103,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI
   /* background thread expiring operations that have timed out */
   private val expirationReaper = new ExpiredOperationReaper
 
+  private val metricsTags = Map("delayedOperation" -> purgatoryName)
+
   newGauge(
     "PurgatorySize",
     new Gauge[Int] {
       def value = watched()
-    }
+    },
+    metricsTags
   )
 
   newGauge(
     "NumDelayedOperations",
     new Gauge[Int] {
       def value = delayed()
-    }
+    },
+    metricsTags
   )
 
   expirationReaper.start()
@@ -283,9 +292,12 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI
       val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
       if (curr != null.asInstanceOf[T]) {
         // if there is an expired operation, try to force complete it
-        if (curr synchronized curr.forceComplete()) {
-          debug("Force complete expired delayed operation %s".format(curr))
+        val completedByMe: Boolean = curr synchronized {
+          curr.onExpiration()
+          curr.forceComplete()
         }
+        if (completedByMe)
+          debug("Force complete expired delayed operation %s".format(curr))
       }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index c229088..4d763bf 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -18,9 +18,14 @@
 package kafka.server
 
 
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
 import kafka.api.ProducerResponseStatus
 import kafka.common.ErrorMapping
 import kafka.common.TopicAndPartition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
 
 import scala.collection._
 
@@ -110,6 +115,14 @@ class DelayedProduce(delayMs: Long,
       false
   }
 
+  override def onExpiration() {
+    produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
+      if (status.acksPending) {
+        DelayedProduceMetrics.recordExpiration(topicPartition)
+      }
+    }
+  }
+
   /**
    * Upon completion, return the current response status along with the error code per partition
    */
@@ -118,3 +131,21 @@ class DelayedProduce(delayMs: Long,
     responseCallback(responseStatus)
   }
 }
+
+object DelayedProduceMetrics extends KafkaMetricsGroup {
+
+  private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+  private val partitionExpirationMeterFactory = (key: TopicAndPartition) =>
+    newMeter("ExpiresPerSec",
+             "requests",
+             TimeUnit.SECONDS,
+             tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
+  private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
+
+  def recordExpiration(partition: TopicAndPartition) {
+    aggregateExpirationMeter.mark()
+    partitionExpirationMeters.getAndMaybePut(partition).mark()
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b82ff55..586cf4c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -82,8 +82,10 @@ class ReplicaManager(val config: KafkaConfig,
   this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
   val stateChangeLogger = KafkaController.stateChangeLogger
 
-  val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
-  val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
+  val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+    purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
+  val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+    purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
 
 
   newGauge(
@@ -392,6 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
                     fetchInfo: Map[TopicAndPartition, PartitionFetchInfo],
                     responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
 
+    val isFromFollower = replicaId >= 0
     val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
     val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
 
@@ -421,7 +424,7 @@ class ReplicaManager(val config: KafkaConfig,
       val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
         (topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get))
       }
-      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus)
+      val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
       val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
 
       // create a list of (topic, partition) pairs to use as keys for this delayed fetch operation

http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 93f52d3..7a37617 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -28,7 +28,7 @@ class DelayedOperationTest extends JUnit3Suite {
   
   override def setUp() {
     super.setUp()
-    purgatory = new DelayedOperationPurgatory[MockDelayedOperation](0, 5)
+    purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5)
   }
   
   override def tearDown() {
@@ -114,6 +114,10 @@ class DelayedOperationTest extends JUnit3Suite {
         false
     }
 
+    override def onExpiration() {
+
+    }
+
     override def onComplete() {
       synchronized {
         notify()