You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2023/02/14 17:15:15 UTC

[kafka] branch trunk updated: MINOR: Better logging to distinguish clean vs unclean loading times (#13242)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 9584b48a2a8 MINOR: Better logging to distinguish clean vs unclean loading times (#13242)
9584b48a2a8 is described below

commit 9584b48a2a873c692d5054dd06b66232dc25e080
Author: Jason Gustafson <ja...@confluent.io>
AuthorDate: Tue Feb 14 09:15:05 2023 -0800

    MINOR: Better logging to distinguish clean vs unclean loading times (#13242)
    
    Current log loading logging makes it difficult to analyze the behavior in the case of clean and unclean shutdown. The log message looks the same either way. Additionally, the logging about unclean recovery also catches the case when a new broker is initializing from an empty log dir. This patch adds some additional information to existing log messages to make it easier to distinguish these cases.
    
    Reviewers: David Jacot <dj...@confluent.io>
---
 core/src/main/scala/kafka/log/LogManager.scala | 20 +++++++++++++++-----
 1 file changed, 15 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 6b2704d0fc1..d2b9ec56f08 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -354,6 +354,7 @@ class LogManager(logDirs: Seq[File],
       error(s"Error while loading log dir $logDirAbsolutePath", e)
     }
 
+    val uncleanLogDirs = mutable.Buffer.empty[String]
     for (dir <- liveLogDirs) {
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
@@ -364,14 +365,10 @@ class LogManager(logDirs: Seq[File],
 
         val cleanShutdownFile = new File(dir, LogLoader.CleanShutdownFile)
         if (cleanShutdownFile.exists) {
-          info(s"Skipping recovery for all logs in $logDirAbsolutePath since clean shutdown file was found")
           // Cache the clean shutdown status and use that for rest of log loading workflow. Delete the CleanShutdownFile
           // so that if broker crashes while loading the log, it is considered hard shutdown during the next boot up. KAFKA-10471
           Files.deleteIfExists(cleanShutdownFile.toPath)
           hadCleanShutdown = true
-        } else {
-          // log recovery itself is being performed by `Log` class during initialization
-          info(s"Attempting recovery for all logs in $logDirAbsolutePath since no clean shutdown file was found")
         }
 
         var recoveryPoints = Map[TopicPartition, Long]()
@@ -401,6 +398,17 @@ class LogManager(logDirs: Seq[File],
         numTotalLogs += logsToLoad.length
         numRemainingLogs.put(dir.getAbsolutePath, logsToLoad.length)
 
+        if (logsToLoad.isEmpty) {
+          info(s"No logs found to be loaded in $logDirAbsolutePath")
+        } else if (hadCleanShutdown) {
+          info(s"Skipping recovery of ${logsToLoad.length} logs from $logDirAbsolutePath since " +
+            "clean shutdown file was found")
+        } else {
+          info(s"Recovering ${logsToLoad.length} logs from $logDirAbsolutePath since no " +
+            "clean shutdown file was found")
+          uncleanLogDirs.append(logDirAbsolutePath)
+        }
+
         val jobsForDir = logsToLoad.map { logDir =>
           val runnable: Runnable = () => {
             debug(s"Loading log $logDir")
@@ -454,7 +462,9 @@ class LogManager(logDirs: Seq[File],
       threadPools.foreach(_.shutdown())
     }
 
-    info(s"Loaded $numTotalLogs logs in ${time.hiResClockMs() - startMs}ms.")
+    val elapsedMs = time.hiResClockMs() - startMs
+    val printedUncleanLogDirs = if (uncleanLogDirs.isEmpty) "" else s" (unclean log dirs = $uncleanLogDirs)"
+    info(s"Loaded $numTotalLogs logs in ${elapsedMs}ms$printedUncleanLogDirs")
   }
 
   private[log] def addLogRecoveryMetrics(numRemainingLogs: ConcurrentMap[String, Int],