You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2019/08/04 04:01:15 UTC
[kafka] branch trunk updated: KAFKA-6263;
Expose metrics for group and transaction metadata loading duration
This is an automated email from the ASF dual-hosted git repository.
gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 0b1dc1c KAFKA-6263; Expose metrics for group and transaction metadata loading duration
0b1dc1c is described below
commit 0b1dc1ca7be3566a4367d7d3aae2c691cf94f48e
Author: anatasiavela <an...@berkeley.edu>
AuthorDate: Sat Aug 3 21:00:46 2019 -0700
KAFKA-6263; Expose metrics for group and transaction metadata loading duration
[JIRA](https://issues.apache.org/jira/browse/KAFKA-6263)
- Add metrics to provide visibility for how long group metadata and transaction metadata take to load in order to understand some inactivity seen in the consumer groups
- Tests include mocking load times by creating a delay after each are loaded and ensuring the measured JMX metric is as it should be
Author: anatasiavela <an...@berkeley.edu>
Reviewers: Gwen Shapira, Jason Gustafson
Closes #7045 from anatasiavela/KAFKA-6263
---
.../kafka/coordinator/group/GroupCoordinator.scala | 16 +++++---
.../coordinator/group/GroupMetadataManager.scala | 20 ++++++++-
.../transaction/TransactionCoordinator.scala | 2 +-
.../transaction/TransactionStateManager.scala | 21 ++++++++--
core/src/main/scala/kafka/server/KafkaServer.scala | 2 +-
.../group/GroupCoordinatorConcurrencyTest.scala | 3 +-
.../coordinator/group/GroupCoordinatorTest.scala | 3 +-
.../group/GroupMetadataManagerTest.scala | 47 +++++++++++++++++++++-
.../TransactionCoordinatorConcurrencyTest.scala | 3 +-
.../transaction/TransactionStateManagerTest.scala | 39 +++++++++++++++++-
docs/ops.html | 20 +++++++++
11 files changed, 157 insertions(+), 19 deletions(-)
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
index 6a57d59..7874946 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala
@@ -28,6 +28,7 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.message.JoinGroupResponseData.JoinGroupResponseMember
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.RecordBatch.{NO_PRODUCER_EPOCH, NO_PRODUCER_ID}
@@ -55,7 +56,8 @@ class GroupCoordinator(val brokerId: Int,
val groupManager: GroupMetadataManager,
val heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
val joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
- time: Time) extends Logging {
+ time: Time,
+ metrics: Metrics) extends Logging {
import GroupCoordinator._
type JoinCallback = JoinGroupResult => Unit
@@ -1084,10 +1086,11 @@ object GroupCoordinator {
def apply(config: KafkaConfig,
zkClient: KafkaZkClient,
replicaManager: ReplicaManager,
- time: Time): GroupCoordinator = {
+ time: Time,
+ metrics: Metrics): GroupCoordinator = {
val heartbeatPurgatory = DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", config.brokerId)
val joinPurgatory = DelayedOperationPurgatory[DelayedJoin]("Rebalance", config.brokerId)
- apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time)
+ apply(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, time, metrics)
}
private[group] def offsetConfig(config: KafkaConfig) = OffsetConfig(
@@ -1108,7 +1111,8 @@ object GroupCoordinator {
replicaManager: ReplicaManager,
heartbeatPurgatory: DelayedOperationPurgatory[DelayedHeartbeat],
joinPurgatory: DelayedOperationPurgatory[DelayedJoin],
- time: Time): GroupCoordinator = {
+ time: Time,
+ metrics: Metrics): GroupCoordinator = {
val offsetConfig = this.offsetConfig(config)
val groupConfig = GroupConfig(groupMinSessionTimeoutMs = config.groupMinSessionTimeoutMs,
groupMaxSessionTimeoutMs = config.groupMaxSessionTimeoutMs,
@@ -1116,8 +1120,8 @@ object GroupCoordinator {
groupInitialRebalanceDelayMs = config.groupInitialRebalanceDelay)
val groupMetadataManager = new GroupMetadataManager(config.brokerId, config.interBrokerProtocolVersion,
- offsetConfig, replicaManager, zkClient, time)
- new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time)
+ offsetConfig, replicaManager, zkClient, time, metrics)
+ new GroupCoordinator(config.brokerId, groupConfig, offsetConfig, groupMetadataManager, heartbeatPurgatory, joinPurgatory, time, metrics)
}
def joinError(memberId: String, error: Errors): JoinGroupResult = {
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 03c3e37..7d8499f 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -36,6 +36,8 @@ import kafka.zk.KafkaZkClient
import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.stats.{Avg, Max}
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types._
@@ -53,7 +55,8 @@ class GroupMetadataManager(brokerId: Int,
config: OffsetConfig,
replicaManager: ReplicaManager,
zkClient: KafkaZkClient,
- time: Time) extends Logging with KafkaMetricsGroup {
+ time: Time,
+ metrics: Metrics) extends Logging with KafkaMetricsGroup {
private val compressionType: CompressionType = CompressionType.forId(config.offsetsTopicCompressionCodec.codec)
@@ -82,6 +85,16 @@ class GroupMetadataManager(brokerId: Int,
* We use this structure to quickly find the groups which need to be updated by the commit/abort marker. */
private val openGroupsForProducer = mutable.HashMap[Long, mutable.Set[String]]()
+ /* setup metrics*/
+ val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+
+ partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
+ "group-coordinator-metrics",
+ "The max time it took to load the partitions in the last 30sec"), new Max())
+ partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
+ "group-coordinator-metrics",
+ "The avg time it took to load the partitions in the last 30sec"), new Avg())
+
this.logIdent = s"[GroupMetadataManager brokerId=$brokerId] "
private def recreateGauge[T](name: String, gauge: Gauge[T]): Gauge[T] = {
@@ -498,7 +511,10 @@ class GroupMetadataManager(brokerId: Int,
try {
val startMs = time.milliseconds()
doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
- info(s"Finished loading offsets and group metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds.")
+ val endMs = time.milliseconds()
+ val timeLapse = endMs - startMs
+ partitionLoadSensor.record(timeLapse, endMs, false)
+ info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.")
} catch {
case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
} finally {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
index 9d4eed6..6d99889 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
@@ -55,7 +55,7 @@ object TransactionCoordinator {
// we do not need to turn on reaper thread since no tasks will be expired and there are no completed tasks to be purged
val txnMarkerPurgatory = DelayedOperationPurgatory[DelayedTxnMarker]("txn-marker-purgatory", config.brokerId,
reaperEnabled = false, timerEnabled = false)
- val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time)
+ val txnStateManager = new TransactionStateManager(config.brokerId, zkClient, scheduler, replicaManager, txnConfig, time, metrics)
val logContext = new LogContext(s"[TransactionCoordinator id=${config.brokerId}] ")
val txnMarkerChannelManager = TransactionMarkerChannelManager(config, metrics, metadataCache, txnStateManager,
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index 45ee4e9..38caed5 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -30,6 +30,8 @@ import kafka.utils.{Logging, Pool, Scheduler}
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.{KafkaException, TopicPartition}
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.stats.{Avg, Max}
+import org.apache.kafka.common.metrics.{MetricConfig, Metrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -70,7 +72,8 @@ class TransactionStateManager(brokerId: Int,
scheduler: Scheduler,
replicaManager: ReplicaManager,
config: TransactionConfig,
- time: Time) extends Logging {
+ time: Time,
+ metrics: Metrics) extends Logging {
this.logIdent = "[Transaction State Manager " + brokerId + "]: "
@@ -94,6 +97,16 @@ class TransactionStateManager(brokerId: Int,
/** number of partitions for the transaction log topic */
private val transactionTopicPartitionCount = getTransactionTopicPartitionCount
+ /** setup metrics*/
+ private val partitionLoadSensor = metrics.sensor("PartitionLoadTime")
+
+ partitionLoadSensor.add(metrics.metricName("partition-load-time-max",
+ "transaction-coordinator-metrics",
+ "The max time it took to load the partitions in the last 30sec"), new Max())
+ partitionLoadSensor.add(metrics.metricName("partition-load-time-avg",
+ "transaction-coordinator-metrics",
+ "The avg time it took to load the partitions in the last 30sec"), new Avg())
+
// visible for testing only
private[transaction] def addLoadingPartition(partitionId: Int, coordinatorEpoch: Int): Unit = {
val partitionAndLeaderEpoch = TransactionPartitionAndLeaderEpoch(partitionId, coordinatorEpoch)
@@ -339,8 +352,10 @@ class TransactionStateManager(brokerId: Int,
currOffset = batch.nextOffset
}
}
-
- info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in ${time.milliseconds() - startMs} milliseconds")
+ val endMs = time.milliseconds()
+ val timeLapse = endMs - startMs
+ partitionLoadSensor.record(timeLapse, endMs, false)
+ info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in $timeLapse milliseconds")
}
} catch {
case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t)
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 07ffe9d..6c433b7 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -276,7 +276,7 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
/* start group coordinator */
// Hardcode Time.SYSTEM for now as some Streams tests fail otherwise, it would be good to fix the underlying issue
- groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM)
+ groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, Time.SYSTEM, metrics)
groupCoordinator.startup()
/* start transaction coordinator, with a separate background thread scheduler for transaction expiration and log loading */
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
index b85035f..ec31fc9 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorConcurrencyTest.scala
@@ -26,6 +26,7 @@ import kafka.coordinator.group.GroupCoordinatorConcurrencyTest._
import kafka.server.{DelayedOperationPurgatory, KafkaConfig}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.requests.JoinGroupRequest
@@ -84,7 +85,7 @@ class GroupCoordinatorConcurrencyTest extends AbstractCoordinatorConcurrencyTest
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
- groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
+ groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time, new Metrics())
groupCoordinator.startup(false)
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index cdf1518..5a35e33 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantLock
import kafka.cluster.Partition
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.message.LeaveGroupRequestData.MemberIdentity
import org.junit.Assert._
import org.junit.{After, Assert, Before, Test}
@@ -111,7 +112,7 @@ class GroupCoordinatorTest {
val heartbeatPurgatory = new DelayedOperationPurgatory[DelayedHeartbeat]("Heartbeat", timer, config.brokerId, reaperEnabled = false)
val joinPurgatory = new DelayedOperationPurgatory[DelayedJoin]("Rebalance", timer, config.brokerId, reaperEnabled = false)
- groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time)
+ groupCoordinator = GroupCoordinator(config, zkClient, replicaManager, heartbeatPurgatory, joinPurgatory, timer.time, new Metrics())
groupCoordinator.startup(enableMetadataExpiration = false)
// add the partition into the owned partition list
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index faca447..dbcf5ed 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -19,11 +19,12 @@ package kafka.coordinator.group
import com.yammer.metrics.Metrics
import com.yammer.metrics.core.Gauge
+import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.Collections
import java.util.Optional
import java.util.concurrent.locks.ReentrantLock
-
+import javax.management.ObjectName
import kafka.api._
import kafka.cluster.Partition
import kafka.common.OffsetAndMetadata
@@ -35,6 +36,7 @@ import org.apache.kafka.clients.consumer.ConsumerPartitionAssignor.Subscription
import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics => kMetrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.OffsetFetchResponse
@@ -56,6 +58,7 @@ class GroupMetadataManagerTest {
var zkClient: KafkaZkClient = null
var partition: Partition = null
var defaultOffsetRetentionMs = Long.MaxValue
+ var metrics: kMetrics = null
val groupId = "foo"
val groupInstanceId = Some("bar")
@@ -87,9 +90,10 @@ class GroupMetadataManagerTest {
EasyMock.expect(zkClient.getTopicPartitionCount(Topic.GROUP_METADATA_TOPIC_NAME)).andReturn(Some(2))
EasyMock.replay(zkClient)
+ metrics = new kMetrics()
time = new MockTime
replicaManager = EasyMock.createNiceMock(classOf[ReplicaManager])
- groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time)
+ groupMetadataManager = new GroupMetadataManager(0, ApiVersion.latestVersion, offsetConfig, replicaManager, zkClient, time, metrics)
partition = EasyMock.niceMock(classOf[Partition])
}
@@ -2051,4 +2055,43 @@ class GroupMetadataManagerTest {
group.transitionTo(CompletingRebalance)
expectMetrics(groupMetadataManager, 1, 0, 1)
}
+
+ @Test
+ def testPartitionLoadMetric(): Unit = {
+ val server = ManagementFactory.getPlatformMBeanServer
+ val mBeanName = "kafka.server:type=group-coordinator-metrics"
+ val reporter = new JmxReporter("kafka.server")
+ metrics.addReporter(reporter)
+
+ def partitionLoadTime(attribute: String): Double = {
+ server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double]
+ }
+
+ assertTrue(server.isRegistered(new ObjectName(mBeanName)))
+ assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0)
+ assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0)
+ assertTrue(reporter.containsMbean(mBeanName))
+
+ val groupMetadataTopicPartition = groupTopicPartition
+ val startOffset = 15L
+ val memberId = "98098230493"
+ val committedOffsets = Map(
+ new TopicPartition("foo", 0) -> 23L,
+ new TopicPartition("foo", 1) -> 455L,
+ new TopicPartition("bar", 0) -> 8992L
+ )
+
+ val offsetCommitRecords = createCommittedOffsetRecords(committedOffsets)
+ val groupMetadataRecord = buildStableGroupRecordWithMember(generation = 15,
+ protocolType = "consumer", protocol = "range", memberId)
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE,
+ (offsetCommitRecords ++ Seq(groupMetadataRecord)).toArray: _*)
+
+ expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
+ EasyMock.replay(replicaManager)
+ groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+
+ assertTrue(partitionLoadTime("partition-load-time-max") >= 0.0)
+ assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0.0)
+ }
}
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
index bc6ed93..238ef4b 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionCoordinatorConcurrencyTest.scala
@@ -28,6 +28,7 @@ import kafka.utils.{Pool, TestUtils}
import org.apache.kafka.clients.{ClientResponse, NetworkClient}
import org.apache.kafka.common.{Node, TopicPartition}
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
+import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.protocol.{ApiKeys, Errors}
import org.apache.kafka.common.record.{CompressionType, FileRecords, MemoryRecords, SimpleRecord}
import org.apache.kafka.common.requests._
@@ -68,7 +69,7 @@ class TransactionCoordinatorConcurrencyTest extends AbstractCoordinatorConcurren
.anyTimes()
EasyMock.replay(zkClient)
- txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time)
+ txnStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time, new Metrics())
for (i <- 0 until numPartitions)
txnStateManager.addLoadedTransactionsToCache(i, coordinatorEpoch, new Pool[String, TransactionMetadata]())
diff --git a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
index 14745e7..4e778dd 100644
--- a/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/transaction/TransactionStateManagerTest.scala
@@ -16,8 +16,10 @@
*/
package kafka.coordinator.transaction
+import java.lang.management.ManagementFactory
import java.nio.ByteBuffer
import java.util.concurrent.locks.ReentrantLock
+import javax.management.ObjectName
import kafka.log.Log
import kafka.server.{FetchDataInfo, FetchLogEnd, LogOffsetMetadata, ReplicaManager}
@@ -26,6 +28,7 @@ import org.scalatest.Assertions.fail
import kafka.zk.KafkaZkClient
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.internals.Topic.TRANSACTION_STATE_TOPIC_NAME
+import org.apache.kafka.common.metrics.{JmxReporter, Metrics}
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record._
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
@@ -59,9 +62,10 @@ class TransactionStateManagerTest {
.anyTimes()
EasyMock.replay(zkClient)
+ val metrics = new Metrics()
val txnConfig = TransactionConfig()
- val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time)
+ val transactionManager: TransactionStateManager = new TransactionStateManager(0, zkClient, scheduler, replicaManager, txnConfig, time, metrics)
val transactionalId1: String = "one"
val transactionalId2: String = "two"
@@ -627,4 +631,37 @@ class TransactionStateManagerTest {
EasyMock.replay(replicaManager)
}
+
+ @Test
+ def testPartitionLoadMetric(): Unit = {
+ val server = ManagementFactory.getPlatformMBeanServer
+ val mBeanName = "kafka.server:type=transaction-coordinator-metrics"
+ val reporter = new JmxReporter("kafka.server")
+ metrics.addReporter(reporter)
+
+ def partitionLoadTime(attribute: String): Double = {
+ server.getAttribute(new ObjectName(mBeanName), attribute).asInstanceOf[Double]
+ }
+
+ assertTrue(server.isRegistered(new ObjectName(mBeanName)))
+ assertEquals(Double.NaN, partitionLoadTime( "partition-load-time-max"), 0)
+ assertEquals(Double.NaN, partitionLoadTime("partition-load-time-avg"), 0)
+ assertTrue(reporter.containsMbean(mBeanName))
+
+ txnMetadata1.state = Ongoing
+ txnMetadata1.addPartitions(Set[TopicPartition](new TopicPartition("topic1", 1),
+ new TopicPartition("topic1", 1)))
+
+ txnRecords += new SimpleRecord(txnMessageKeyBytes1, TransactionLog.valueToBytes(txnMetadata1.prepareNoTransit()))
+
+ val startOffset = 15L
+ val records = MemoryRecords.withRecords(startOffset, CompressionType.NONE, txnRecords.toArray: _*)
+
+ prepareTxnLog(topicPartition, startOffset, records)
+ transactionManager.loadTransactionsForTxnTopicPartition(partitionId, 0, (_, _, _, _, _) => ())
+ scheduler.tick()
+
+ assertTrue(partitionLoadTime("partition-load-time-max") >= 0)
+ assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0)
+ }
}
diff --git a/docs/ops.html b/docs/ops.html
index f94afc3..984b80d 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1029,6 +1029,26 @@
<td>Connection status of broker's ZooKeeper session which may be one of
Disconnected|SyncConnected|AuthFailed|ConnectedReadOnly|SaslAuthenticated|Expired.</td>
</tr>
+ <tr>
+ <td>Max time to load group metadata</td>
+ <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-max</td>
+ <td>maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+ </tr>
+ <tr>
+ <td>Avg time to load group metadata</td>
+ <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg</td>
+ <td>average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+ </tr>
+ <tr>
+ <td>Max time to load transaction metadata</td>
+ <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max</td>
+ <td>maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+ </tr>
+ <tr>
+ <td>Avg time to load transaction metadata</td>
+ <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg</td>
+ <td>average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+ </tr>
</tbody></table>
<h4><a id="selector_monitoring" href="#selector_monitoring">Common monitoring metrics for producer/consumer/connect/streams</a></h4>