You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2022/05/12 18:18:22 UTC

[GitHub] [kafka] junrao commented on a diff in pull request #12136: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly

junrao commented on code in PR #12136:
URL: https://github.com/apache/kafka/pull/12136#discussion_r871593948


##########
core/src/main/scala/kafka/log/LogManager.scala:
##########
@@ -376,8 +381,10 @@ class LogManager(logDirs: Seq[File],
                 s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((logDirAbsolutePath, e))
-                error(s"Error while loading log dir $logDirAbsolutePath", e)
+                handleIOException(logDirAbsolutePath, e)
+              case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] =>
+                // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache
+                handleIOException(logDirAbsolutePath, e.getCause.asInstanceOf[IOException])

Review Comment:
   If we hit KafkaStorageException, that means the failed disk has already been reported to the logDirFailureChannel when KafkaStorageException was generated. So, we probably don't need to track it in offlineDirs again.



##########
core/src/test/scala/unit/kafka/log/LogLoaderTest.scala:
##########
@@ -158,22 +197,54 @@ class LogLoaderTest {
     }
 
     locally {
-      simulateError.hasError = true
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+      val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+      val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.RuntimeException, logDirFailureChannel)
 
-      // Simulate error
-      assertThrows(classOf[RuntimeException], () => {
-        val defaultConfig = logManager.currentDefaultConfig
-        logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-      })
+      // Simulate Runtime error
+      assertThrows(classOf[RuntimeException], runLoadLogs)
       assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed")
+      assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown")
+
       // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time.
       simulateError.hasError = false
       cleanShutdownInterceptedValue = true
       val defaultConfig = logManager.currentDefaultConfig
       logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag")
+      logManager.shutdown()
+    }
+
+    locally {
+      val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+      val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(true, ErrorTypes.IOException, logDirFailureChannel)
+
+      // Simulate IO error
+      assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled")
+
+      assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown")
+      logManager.shutdown()
+    }
+
+    locally {

Review Comment:
   Could we avoid duplicating the code by iterating the same logic twice?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org