You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/04/23 20:27:44 UTC

[GitHub] [kafka] lbradstreet commented on a change in pull request #8467: MINOR: reduce allocations in log start and recovery checkpoints

lbradstreet commented on a change in pull request #8467:
URL: https://github.com/apache/kafka/pull/8467#discussion_r414100944



##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -1003,9 +1003,17 @@ class LogManager(logDirs: Seq[File],
   /**
    * Map of log dir to logs by topic and partitions in that dir
    */
-  private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-    (this.currentLogs.toList ++ this.futureLogs.toList).toMap
-      .groupBy { case (_, log) => log.parentDir }
+  def logsByDir: Map[String, Map[TopicPartition, Log]] = {
+    // This code is called often by checkpoint processes and is written in a way that reduces
+    // allocations and CPU with many topic partitions.
+    // When changing this code please measure the changes with org.apache.kafka.jmh.server.CheckpointBench
+    val byDir = new mutable.AnyRefMap[String, mutable.AnyRefMap[TopicPartition, Log]]()
+    def addToDir(tp: TopicPartition, log: Log): Unit = {
+      byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
+    }
+    currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
+    futureLogs.foreach { case (tp, log) => addToDir(tp, log) }

Review comment:
       I tried out your suggestion and interestingly saw more allocations. Note extreme use of the `var item` there in case there was an extra allocation there.
   
   ```
   --- a/core/src/main/scala/kafka/log/LogManager.scala
   +++ b/core/src/main/scala/kafka/log/LogManager.scala
   @@ -1011,8 +1011,8 @@ class LogManager(logDirs: Seq[File],
        def addToDir(tp: TopicPartition, log: Log): Unit = {
          byDir.getOrElseUpdate(log.parentDir, new mutable.AnyRefMap[TopicPartition, Log]()).put(tp, log)
        }
   -    currentLogs.foreach { case (tp, log) => addToDir(tp, log) }
   -    futureLogs.foreach { case (tp, log) => addToDir(tp, log) }
   +    currentLogs.foreachEntry(addToDir)
   +    futureLogs.foreachEntry(addToDir)
        byDir
      }
    
   diff --git a/core/src/main/scala/kafka/utils/Pool.scala b/core/src/main/scala/kafka/utils/Pool.scala
   index 964de7eae2..450f90611b 100644
   --- a/core/src/main/scala/kafka/utils/Pool.scala
   +++ b/core/src/main/scala/kafka/utils/Pool.scala
   @@ -74,6 +74,15 @@ class Pool[K,V](valueFactory: Option[K => V] = None) extends Iterable[(K, V)] {
      def values: Iterable[V] = pool.values.asScala
    
      def clear(): Unit = { pool.clear() }
   +
   +  def foreachEntry(f: (K, V) => Unit): Unit = {
   +    val iter = iterator
   +    var item: (K, V) = null
   +    while(iter.hasNext) {
   +      item = iter.next()
   +      f(item._1, item._2)
   +    }
   +  }
   
   CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  1428889.965 ±  75131.113    B/op
   vs trunk:
   CheckpointBench.measureCheckpointLogStartOffsets:·gc.alloc.rate.norm                         3         2000  thrpt   15  1284326.850 ±  75148.430    B/op
   
   ```




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org