You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jj...@apache.org on 2015/02/25 19:59:38 UTC
kafka git commit: KAFKA-1953; KAFKA-1962;
Disambiguate purgatory metrics; restore delayed request metrics;
reviewed by Guozhang Wang
Repository: kafka
Updated Branches:
refs/heads/trunk 10311c138 -> b8904e961
KAFKA-1953; KAFKA-1962; Disambiguate purgatory metrics; restore delayed request metrics; reviewed by Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/b8904e96
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/b8904e96
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/b8904e96
Branch: refs/heads/trunk
Commit: b8904e9614b4c1f11b8487e7a4b88b1e37e1f20b
Parents: 10311c1
Author: Joel Koshy <jj...@gmail.com>
Authored: Wed Feb 25 10:59:28 2015 -0800
Committer: Joel Koshy <jj...@gmail.com>
Committed: Wed Feb 25 10:59:28 2015 -0800
----------------------------------------------------------------------
.../kafka/coordinator/ConsumerCoordinator.scala | 6 ++--
.../kafka/coordinator/DelayedHeartbeat.scala | 4 +++
.../kafka/coordinator/DelayedJoinGroup.scala | 4 +++
.../kafka/coordinator/DelayedRebalance.scala | 4 +++
.../main/scala/kafka/server/DelayedFetch.scala | 20 ++++++++++++-
.../scala/kafka/server/DelayedOperation.scala | 22 ++++++++++----
.../scala/kafka/server/DelayedProduce.scala | 31 ++++++++++++++++++++
.../scala/kafka/server/ReplicaManager.scala | 9 ++++--
.../kafka/server/DelayedOperationTest.scala | 6 +++-
9 files changed, 93 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
index 01cf1d9..21790a5 100644
--- a/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/ConsumerCoordinator.scala
@@ -71,9 +71,9 @@ class ConsumerCoordinator(val config: KafkaConfig,
latestHeartbeatBucketEndMs = SystemTime.milliseconds
// Initialize purgatories for delayed heartbeat, join-group and rebalance operations
- heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](config.brokerId)
- joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](config.brokerId)
- rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](config.brokerId)
+ heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat](purgatoryName = "Heartbeat", brokerId = config.brokerId)
+ joinGroupPurgatory = new DelayedOperationPurgatory[DelayedJoinGroup](purgatoryName = "JoinGroup", brokerId = config.brokerId)
+ rebalancePurgatory = new DelayedOperationPurgatory[DelayedRebalance](purgatoryName = "Rebalance", brokerId = config.brokerId)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
index 894d6ed..b1248e9 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedHeartbeat.scala
@@ -36,6 +36,10 @@ class DelayedHeartbeat(sessionTimeout: Long,
throw new IllegalStateException("Delayed heartbeat purgatory should never try to complete any bucket")
}
+ override def onExpiration() {
+ // TODO
+ }
+
/* mark all consumers within the heartbeat as heartbeat timed out */
override def onComplete() {
for (registry <- bucket.consumerRegistryList)
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
index 445bfa1..df60cbc 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedJoinGroup.scala
@@ -35,6 +35,10 @@ class DelayedJoinGroup(sessionTimeout: Long,
forceComplete()
}
+ override def onExpiration() {
+ // TODO
+ }
+
/* always assume the partition is already assigned as this delayed operation should never time-out */
override def onComplete() {
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
index b3b3749..8defa2e 100644
--- a/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
+++ b/core/src/main/scala/kafka/coordinator/DelayedRebalance.scala
@@ -50,6 +50,10 @@ class DelayedRebalance(sessionTimeout: Long,
false
}
+ override def onExpiration() {
+ // TODO
+ }
+
/* mark consumers that have not re-joined group as failed and proceed to rebalance the rest of the group */
override def onComplete() {
groupRegistry.memberRegistries.values.foreach(consumerRegistry =>
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedFetch.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedFetch.scala b/core/src/main/scala/kafka/server/DelayedFetch.scala
index dd602ee..de6cf5b 100644
--- a/core/src/main/scala/kafka/server/DelayedFetch.scala
+++ b/core/src/main/scala/kafka/server/DelayedFetch.scala
@@ -17,11 +17,14 @@
package kafka.server
+import java.util.concurrent.TimeUnit
+
import kafka.api.FetchResponsePartitionData
import kafka.api.PartitionFetchInfo
import kafka.common.UnknownTopicOrPartitionException
import kafka.common.NotLeaderForPartitionException
import kafka.common.TopicAndPartition
+import kafka.metrics.KafkaMetricsGroup
import scala.collection._
@@ -37,6 +40,7 @@ case class FetchPartitionStatus(startOffsetMetadata: LogOffsetMetadata, fetchInf
case class FetchMetadata(fetchMinBytes: Int,
fetchOnlyLeader: Boolean,
fetchOnlyCommitted: Boolean,
+ isFromFollower: Boolean,
fetchPartitionStatus: Map[TopicAndPartition, FetchPartitionStatus]) {
override def toString = "[minBytes: " + fetchMinBytes + ", " +
@@ -109,6 +113,13 @@ class DelayedFetch(delayMs: Long,
false
}
+ override def onExpiration() {
+ if (fetchMetadata.isFromFollower)
+ DelayedFetchMetrics.followerExpiredRequestMeter.mark()
+ else
+ DelayedFetchMetrics.consumerExpiredRequestMeter.mark()
+ }
+
/**
* Upon completion, read whatever data is available and pass to the complete callback
*/
@@ -122,4 +133,11 @@ class DelayedFetch(delayMs: Long,
responseCallback(fetchPartitionData)
}
-}
\ No newline at end of file
+}
+
+object DelayedFetchMetrics extends KafkaMetricsGroup {
+ private val FetcherTypeKey = "fetcherType"
+ val followerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "follower"))
+ val consumerExpiredRequestMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS, tags = Map(FetcherTypeKey -> "consumer"))
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedOperation.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedOperation.scala b/core/src/main/scala/kafka/server/DelayedOperation.scala
index 1d11099..e317676 100644
--- a/core/src/main/scala/kafka/server/DelayedOperation.scala
+++ b/core/src/main/scala/kafka/server/DelayedOperation.scala
@@ -71,6 +71,11 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
def isCompleted(): Boolean = completed.get()
/**
+ * Call-back to execute when a delayed operation expires, but before completion.
+ */
+ def onExpiration(): Unit
+
+ /**
* Process for completing an operation; This function needs to be defined
* in subclasses and will be called exactly once in forceComplete()
*/
@@ -89,7 +94,7 @@ abstract class DelayedOperation(delayMs: Long) extends DelayedItem(delayMs) {
/**
* A helper purgatory class for bookkeeping delayed operations with a timeout, and expiring timed out operations.
*/
-class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeInterval: Int = 1000)
+class DelayedOperationPurgatory[T <: DelayedOperation](purgatoryName: String, brokerId: Int = 0, purgeInterval: Int = 1000)
extends Logging with KafkaMetricsGroup {
/* a list of operation watching keys */
@@ -98,18 +103,22 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI
/* background thread expiring operations that have timed out */
private val expirationReaper = new ExpiredOperationReaper
+ private val metricsTags = Map("delayedOperation" -> purgatoryName)
+
newGauge(
"PurgatorySize",
new Gauge[Int] {
def value = watched()
- }
+ },
+ metricsTags
)
newGauge(
"NumDelayedOperations",
new Gauge[Int] {
def value = delayed()
- }
+ },
+ metricsTags
)
expirationReaper.start()
@@ -283,9 +292,12 @@ class DelayedOperationPurgatory[T <: DelayedOperation](brokerId: Int = 0, purgeI
val curr = delayedQueue.poll(200L, TimeUnit.MILLISECONDS)
if (curr != null.asInstanceOf[T]) {
// if there is an expired operation, try to force complete it
- if (curr synchronized curr.forceComplete()) {
- debug("Force complete expired delayed operation %s".format(curr))
+ val completedByMe: Boolean = curr synchronized {
+ curr.onExpiration()
+ curr.forceComplete()
}
+ if (completedByMe)
+ debug("Force complete expired delayed operation %s".format(curr))
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/DelayedProduce.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/DelayedProduce.scala b/core/src/main/scala/kafka/server/DelayedProduce.scala
index c229088..4d763bf 100644
--- a/core/src/main/scala/kafka/server/DelayedProduce.scala
+++ b/core/src/main/scala/kafka/server/DelayedProduce.scala
@@ -18,9 +18,14 @@
package kafka.server
+import java.util.concurrent.TimeUnit
+
+import com.yammer.metrics.core.Meter
import kafka.api.ProducerResponseStatus
import kafka.common.ErrorMapping
import kafka.common.TopicAndPartition
+import kafka.metrics.KafkaMetricsGroup
+import kafka.utils.Pool
import scala.collection._
@@ -110,6 +115,14 @@ class DelayedProduce(delayMs: Long,
false
}
+ override def onExpiration() {
+ produceMetadata.produceStatus.foreach { case (topicPartition, status) =>
+ if (status.acksPending) {
+ DelayedProduceMetrics.recordExpiration(topicPartition)
+ }
+ }
+ }
+
/**
* Upon completion, return the current response status along with the error code per partition
*/
@@ -118,3 +131,21 @@ class DelayedProduce(delayMs: Long,
responseCallback(responseStatus)
}
}
+
+object DelayedProduceMetrics extends KafkaMetricsGroup {
+
+ private val aggregateExpirationMeter = newMeter("ExpiresPerSec", "requests", TimeUnit.SECONDS)
+
+ private val partitionExpirationMeterFactory = (key: TopicAndPartition) =>
+ newMeter("ExpiresPerSec",
+ "requests",
+ TimeUnit.SECONDS,
+ tags = Map("topic" -> key.topic, "partition" -> key.partition.toString))
+ private val partitionExpirationMeters = new Pool[TopicAndPartition, Meter](valueFactory = Some(partitionExpirationMeterFactory))
+
+ def recordExpiration(partition: TopicAndPartition) {
+ aggregateExpirationMeter.mark()
+ partitionExpirationMeters.getAndMaybePut(partition).mark()
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/main/scala/kafka/server/ReplicaManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index b82ff55..586cf4c 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -82,8 +82,10 @@ class ReplicaManager(val config: KafkaConfig,
this.logIdent = "[Replica Manager on Broker " + localBrokerId + "]: "
val stateChangeLogger = KafkaController.stateChangeLogger
- val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
- val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
+ val delayedProducePurgatory = new DelayedOperationPurgatory[DelayedProduce](
+ purgatoryName = "Produce", config.brokerId, config.producerPurgatoryPurgeIntervalRequests)
+ val delayedFetchPurgatory = new DelayedOperationPurgatory[DelayedFetch](
+ purgatoryName = "Fetch", config.brokerId, config.fetchPurgatoryPurgeIntervalRequests)
newGauge(
@@ -392,6 +394,7 @@ class ReplicaManager(val config: KafkaConfig,
fetchInfo: Map[TopicAndPartition, PartitionFetchInfo],
responseCallback: Map[TopicAndPartition, FetchResponsePartitionData] => Unit) {
+ val isFromFollower = replicaId >= 0
val fetchOnlyFromLeader: Boolean = replicaId != Request.DebuggingConsumerId
val fetchOnlyCommitted: Boolean = ! Request.isValidBrokerId(replicaId)
@@ -421,7 +424,7 @@ class ReplicaManager(val config: KafkaConfig,
val fetchPartitionStatus = logReadResults.map { case (topicAndPartition, result) =>
(topicAndPartition, FetchPartitionStatus(result.info.fetchOffset, fetchInfo.get(topicAndPartition).get))
}
- val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, fetchPartitionStatus)
+ val fetchMetadata = FetchMetadata(fetchMinBytes, fetchOnlyFromLeader, fetchOnlyCommitted, isFromFollower, fetchPartitionStatus)
val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, responseCallback)
// create a list of (topic, partition) pairs to use as keys for this delayed fetch operation
http://git-wip-us.apache.org/repos/asf/kafka/blob/b8904e96/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
index 93f52d3..7a37617 100644
--- a/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DelayedOperationTest.scala
@@ -28,7 +28,7 @@ class DelayedOperationTest extends JUnit3Suite {
override def setUp() {
super.setUp()
- purgatory = new DelayedOperationPurgatory[MockDelayedOperation](0, 5)
+ purgatory = new DelayedOperationPurgatory[MockDelayedOperation](purgatoryName = "mock", 0, 5)
}
override def tearDown() {
@@ -114,6 +114,10 @@ class DelayedOperationTest extends JUnit3Suite {
false
}
+ override def onExpiration() {
+
+ }
+
override def onComplete() {
synchronized {
notify()