You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2020/03/20 06:35:59 UTC

[kafka] branch trunk updated: KAFKA-9553; Improve measurement for loading groups and transactions (#8155)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 24d05aa  KAFKA-9553; Improve measurement for loading groups and transactions (#8155)
24d05aa is described below

commit 24d05aa601e05ec92948364f72e1d5f92998475d
Author: Agam Brahma <ag...@gmail.com>
AuthorDate: Thu Mar 19 23:35:16 2020 -0700

    KAFKA-9553; Improve measurement for loading groups and transactions (#8155)
    
    This patch modifies the loading time metric to account for time spent waiting for the loading time task to be scheduled.
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../coordinator/group/GroupMetadataManager.scala   | 17 ++++----
 .../transaction/TransactionStateManager.scala      | 15 +++----
 .../coordinator/group/GroupCoordinatorTest.scala   |  2 +-
 .../group/GroupMetadataManagerTest.scala           | 49 ++++++++++++----------
 docs/ops.html                                      |  8 ++--
 5 files changed, 49 insertions(+), 42 deletions(-)

diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index 785dcc3..dd083ab 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -521,20 +521,23 @@ class GroupMetadataManager(brokerId: Int,
     val topicPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, offsetsPartition)
     if (addLoadingPartition(offsetsPartition)) {
       info(s"Scheduling loading of offsets and group metadata from $topicPartition")
-      scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, onGroupLoaded))
+      val startTimeMs = time.milliseconds()
+      scheduler.schedule(topicPartition.toString, () => loadGroupsAndOffsets(topicPartition, onGroupLoaded, startTimeMs))
     } else {
       info(s"Already loading offsets and group metadata from $topicPartition")
     }
   }
 
-  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit): Unit = {
+  private[group] def loadGroupsAndOffsets(topicPartition: TopicPartition, onGroupLoaded: GroupMetadata => Unit, startTimeMs: java.lang.Long): Unit = {
     try {
-      val startMs = time.milliseconds()
+      val schedulerTimeMs = time.milliseconds() - startTimeMs
       doLoadGroupsAndOffsets(topicPartition, onGroupLoaded)
-      val endMs = time.milliseconds()
-      val timeLapse = endMs - startMs
-      partitionLoadSensor.record(timeLapse, endMs, false)
-      info(s"Finished loading offsets and group metadata from $topicPartition in $timeLapse milliseconds.")
+      val endTimeMs = time.milliseconds()
+      val totalLoadingTimeMs = endTimeMs - startTimeMs
+      partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
+      info(s"Finished loading offsets and group metadata from $topicPartition "
+        + s"in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds"
+        + s" was spent in the scheduler.")
     } catch {
       case t: Throwable => error(s"Error loading offsets from $topicPartition", t)
     } finally {
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index ceed6ac..d144b6a 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -286,7 +286,6 @@ class TransactionStateManager(brokerId: Int,
   private def loadTransactionMetadata(topicPartition: TopicPartition, coordinatorEpoch: Int): Pool[String, TransactionMetadata] =  {
     def logEndOffset = replicaManager.getLogEndOffset(topicPartition).getOrElse(-1L)
 
-    val startMs = time.milliseconds()
     val loadedTransactions = new Pool[String, TransactionMetadata]
 
     replicaManager.getLog(topicPartition) match {
@@ -350,10 +349,6 @@ class TransactionStateManager(brokerId: Int,
                 currOffset = batch.nextOffset
               }
             }
-            val endMs = time.milliseconds()
-            val timeLapse = endMs - startMs
-            partitionLoadSensor.record(timeLapse, endMs, false)
-            info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in $timeLapse milliseconds")
           }
         } catch {
           case t: Throwable => error(s"Error loading transactions from transaction log $topicPartition", t)
@@ -391,11 +386,16 @@ class TransactionStateManager(brokerId: Int,
       loadingPartitions.add(partitionAndLeaderEpoch)
     }
 
-    def loadTransactions(): Unit = {
+    def loadTransactions(startTimeMs: java.lang.Long): Unit = {
+      val schedulerTimeMs = time.milliseconds() - startTimeMs
       info(s"Loading transaction metadata from $topicPartition at epoch $coordinatorEpoch")
       validateTransactionTopicPartitionCountIsStable()
 
       val loadedTransactions = loadTransactionMetadata(topicPartition, coordinatorEpoch)
+      val endTimeMs = time.milliseconds()
+      val totalLoadingTimeMs = endTimeMs - startTimeMs
+      partitionLoadSensor.record(totalLoadingTimeMs, endTimeMs, false)
+      info(s"Finished loading ${loadedTransactions.size} transaction metadata from $topicPartition in $totalLoadingTimeMs milliseconds, of which $schedulerTimeMs milliseconds was spent in the scheduler.")
 
       inWriteLock(stateLock) {
         if (loadingPartitions.contains(partitionAndLeaderEpoch)) {
@@ -433,7 +433,8 @@ class TransactionStateManager(brokerId: Int,
       info(s"Completed loading transaction metadata from $topicPartition for coordinator epoch $coordinatorEpoch")
     }
 
-    scheduler.schedule(s"load-txns-for-partition-$topicPartition", loadTransactions)
+    val scheduleStartMs = time.milliseconds()
+    scheduler.schedule(s"load-txns-for-partition-$topicPartition", () => loadTransactions(scheduleStartMs))
   }
 
   def removeTransactionsForTxnTopicPartition(partitionId: Int): Unit = {
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
index 06ebaa8..a3076d3 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupCoordinatorTest.scala
@@ -189,7 +189,7 @@ class GroupCoordinatorTest {
     EasyMock.reset(replicaManager)
     EasyMock.expect(replicaManager.getLog(otherGroupMetadataTopicPartition)).andReturn(None)
     EasyMock.replay(replicaManager)
-    groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, group => {})
+    groupCoordinator.groupManager.loadGroupsAndOffsets(otherGroupMetadataTopicPartition, group => {}, 0L)
     assertEquals(Errors.NONE, groupCoordinator.handleDescribeGroup(otherGroupId)._1)
   }
 
diff --git a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
index 783e944..c0a6269 100644
--- a/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/coordinator/group/GroupMetadataManagerTest.scala
@@ -117,7 +117,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -149,7 +149,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -186,7 +186,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -220,7 +220,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     // Since there are no committed offsets for the group, and there is no other group metadata, we don't expect the
     // group to be loaded.
@@ -252,7 +252,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -299,7 +299,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -354,7 +354,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -417,7 +417,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -463,7 +463,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -505,7 +505,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     // The group should be loaded with pending offsets.
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -589,7 +589,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -626,7 +626,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -658,7 +658,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     assertEquals(None, groupMetadataManager.getGroup(groupId))
   }
@@ -687,7 +687,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     committedOffsets.foreach { case (topicPartition, offset) =>
@@ -711,7 +711,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
 
     EasyMock.verify(logMock)
     EasyMock.verify(replicaManager)
@@ -747,7 +747,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -790,7 +790,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(logMock, replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -1937,7 +1937,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -1977,7 +1977,7 @@ class GroupMetadataManagerTest {
 
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
     assertEquals(groupId, group.groupId)
@@ -2128,7 +2128,7 @@ class GroupMetadataManagerTest {
     EasyMock.replay(logMock)
     EasyMock.replay(replicaManager)
 
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), 0L)
 
     // Empty control batch should not have caused the load to fail
     val group = groupMetadataManager.getGroup(groupId).getOrElse(fail("Group was not loaded into the cache"))
@@ -2385,9 +2385,12 @@ class GroupMetadataManagerTest {
 
     expectGroupMetadataLoad(groupMetadataTopicPartition, startOffset, records)
     EasyMock.replay(replicaManager)
-    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => ())
 
-    assertTrue(partitionLoadTime("partition-load-time-max") >= 0.0)
-    assertTrue(partitionLoadTime( "partition-load-time-avg") >= 0.0)
+    // When passed a specific start offset, assert that the measured values are in excess of that.
+    val now = time.milliseconds()
+    val diff = 1000
+    groupMetadataManager.loadGroupsAndOffsets(groupMetadataTopicPartition, _ => (), now - diff)
+    assertTrue(partitionLoadTime("partition-load-time-max") >= diff)
+    assertTrue(partitionLoadTime("partition-load-time-avg") >= diff)
   }
 }
diff --git a/docs/ops.html b/docs/ops.html
index 2a0b4ca..a098673 100644
--- a/docs/ops.html
+++ b/docs/ops.html
@@ -1072,22 +1072,22 @@
       <tr>
         <td>Max time to load group metadata</td>
         <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-max</td>
-        <td>maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+        <td>maximum time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)</td>
       </tr>
       <tr>
         <td>Avg time to load group metadata</td>
         <td>kafka.server:type=group-coordinator-metrics,name=partition-load-time-avg</td>
-        <td>average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+        <td>average time, in milliseconds, it took to load offsets and group metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)</td>
       </tr>
       <tr>
         <td>Max time to load transaction metadata</td>
         <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-max</td>
-        <td>maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+        <td>maximum time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)</td>
       </tr>
       <tr>
         <td>Avg time to load transaction metadata</td>
         <td>kafka.server:type=transaction-coordinator-metrics,name=partition-load-time-avg</td>
-        <td>average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds</td>
+        <td>average time, in milliseconds, it took to load transaction metadata from the consumer offset partitions loaded in the last 30 seconds (including time spent waiting for the loading task to be scheduled)</td>
       </tr>
       <tr>
         <td>Consumer Group Offset Count</td>