You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/06/04 08:22:11 UTC

[kafka] branch 3.2 updated: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)

This is an automated email from the ASF dual-hosted git repository.

showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/3.2 by this push:
     new 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)
90db4f47d6 is described below

commit 90db4f47d6394fd4b90b934bf0bb2d7965cd73a6
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Jun 2 14:15:51 2022 +0800

    KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)
    
    When logManager startup and loadLogs, we expect to catch any IOException (ex: out of space error) and turn the log dir into offline. Later, we'll handle the offline logDir in ReplicaManage, so that the cleanShutdown file won't be created when all logDirs are offline. The reason why the broker shutdown with cleanShutdown file after full disk is because during loadLogs and do log recovery, we'll write leader-epoch-checkpoint fil. And if any IOException thrown, we'll wrap it as KafkaStor [...]
    
    This PR is to fix the issue by catching the KafkaStorageException with IOException cause exceptions during loadLogs, and mark the logDir as offline to let the ReplicaManager handle the offline logDirs.
    
    Reviewers: Jun Rao <ju...@confluent.io>, Alok Thatikunta <al...@gmail.com>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 14 ++--
 core/src/main/scala/kafka/server/KafkaServer.scala |  8 ++-
 .../test/scala/unit/kafka/log/LogLoaderTest.scala  | 84 +++++++++++++++++-----
 .../unit/kafka/server/ServerShutdownTest.scala     | 27 +++++--
 4 files changed, 107 insertions(+), 26 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b81f6a928a..93d1eee740 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -317,6 +317,11 @@ class LogManager(logDirs: Seq[File],
     val jobs = ArrayBuffer.empty[Seq[Future[_]]]
     var numTotalLogs = 0
 
+    def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
+      offlineDirs.add((logDirAbsolutePath, e))
+      error(s"Error while loading log dir $logDirAbsolutePath", e)
+    }
+
     for (dir <- liveLogDirs) {
       val logDirAbsolutePath = dir.getAbsolutePath
       var hadCleanShutdown: Boolean = false
@@ -375,8 +380,10 @@ class LogManager(logDirs: Seq[File],
                 s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
             } catch {
               case e: IOException =>
-                offlineDirs.add((logDirAbsolutePath, e))
-                error(s"Error while loading log dir $logDirAbsolutePath", e)
+                handleIOException(logDirAbsolutePath, e)
+              case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] =>
+                // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache
+                // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
             }
           }
           runnable
@@ -385,8 +392,7 @@ class LogManager(logDirs: Seq[File],
         jobs += jobsForDir.map(pool.submit)
       } catch {
         case e: IOException =>
-          offlineDirs.add((logDirAbsolutePath, e))
-          error(s"Error while loading log dir $logDirAbsolutePath", e)
+          handleIOException(logDirAbsolutePath, e)
       }
     }
 
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 67013d3391..5e66fd7f05 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -830,7 +830,13 @@ class KafkaServer(
   private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
     for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) {
       val checkpoint = brokerMetadataCheckpoints(logDir)
-      checkpoint.write(brokerMetadata.toProperties)
+      try {
+        checkpoint.write(brokerMetadata.toProperties)
+      } catch {
+        case e: IOException =>
+          val dirPath = checkpoint.file.getAbsolutePath
+          logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing meta.properties to $dirPath", e)
+      }
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index a6b114320a..4f659da7dd 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -17,7 +17,7 @@
 
 package kafka.log
 
-import java.io.{BufferedWriter, File, FileWriter}
+import java.io.{BufferedWriter, File, FileWriter, IOException}
 import java.nio.ByteBuffer
 import java.nio.file.{Files, NoSuchFileException, Paths}
 import java.util.Properties
@@ -27,9 +27,11 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailure
 import kafka.server.metadata.MockConfigRepository
 import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
 import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
 import org.apache.kafka.common.utils.{Time, Utils}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
 import org.mockito.ArgumentMatchers
 import org.mockito.ArgumentMatchers.{any, anyLong}
@@ -61,6 +63,12 @@ class LogLoaderTest {
     Utils.delete(tmpDir)
   }
 
+  object ErrorTypes extends Enumeration {
+    type Errors = Value
+    val IOException, RuntimeException, KafkaStorageExceptionWithIOExceptionCause,
+    KafkaStorageExceptionWithoutIOExceptionCause = Value
+  }
+
   @Test
   def testLogRecoveryIsCalledUponBrokerCrash(): Unit = {
     // LogManager must realize correctly if the last shutdown was not clean and the logs need
@@ -73,15 +81,19 @@ class LogLoaderTest {
     var log: UnifiedLog = null
     val time = new MockTime()
     var cleanShutdownInterceptedValue = false
-    case class SimulateError(var hasError: Boolean = false)
+    case class SimulateError(var hasError: Boolean = false, var errorType: ErrorTypes.Errors = ErrorTypes.RuntimeException)
     val simulateError = SimulateError()
+    val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
 
     val maxTransactionTimeoutMs = 5 * 60 * 1000
     val maxProducerIdExpirationMs = 60 * 60 * 1000
 
     // Create a LogManager with some overridden methods to facilitate interception of clean shutdown
-    // flag and to inject a runtime error
-    def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = {
+    // flag and to inject an error
+    def interceptedLogManager(logConfig: LogConfig,
+                              logDirs: Seq[File],
+                              logDirFailureChannel: LogDirFailureChannel
+                             ): LogManager = {
       new LogManager(
         logDirs = logDirs.map(_.getAbsoluteFile),
         initialOfflineDirs = Array.empty[File],
@@ -98,7 +110,7 @@ class LogLoaderTest {
         interBrokerProtocolVersion = config.interBrokerProtocolVersion,
         scheduler = time.scheduler,
         brokerTopicStats = new BrokerTopicStats(),
-        logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
+        logDirFailureChannel = logDirFailureChannel,
         time = time,
         keepPartitionMetadataFile = config.usesTopicId) {
 
@@ -106,7 +118,16 @@ class LogLoaderTest {
                              logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
                              topicConfigs: Map[String, LogConfig]): UnifiedLog = {
           if (simulateError.hasError) {
-            throw new RuntimeException("Simulated error")
+            simulateError.errorType match {
+              case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause =>
+                throw new KafkaStorageException(new IOException("Simulated Kafka storage error with IOException cause"))
+              case ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause =>
+                throw new KafkaStorageException("Simulated Kafka storage error without IOException cause")
+              case ErrorTypes.IOException =>
+                throw new IOException("Simulated IO error")
+              case _ =>
+                throw new RuntimeException("Simulated Runtime error")
+            }
           }
           cleanShutdownInterceptedValue = hadCleanShutdown
           val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
@@ -132,10 +153,24 @@ class LogLoaderTest {
       }
     }
 
+    def initializeLogManagerForSimulatingErrorTest(logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+                                                  ): (LogManager, Executable) = {
+      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, logDirFailureChannel)
+      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+
+      assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not be offline before load logs")
+
+      val runLoadLogs: Executable = () => {
+        val defaultConfig = logManager.currentDefaultConfig
+        logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+      }
+
+      (logManager, runLoadLogs)
+    }
+
     val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
     locally {
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+      val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
 
       // Load logs after a clean shutdown
       Files.createFile(cleanShutdownFile.toPath)
@@ -156,22 +191,37 @@ class LogLoaderTest {
     }
 
     locally {
-      simulateError.hasError = true
-      val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError)
-      log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+      val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(logDirFailureChannel)
 
-      // Simulate error
-      assertThrows(classOf[RuntimeException], () => {
-        val defaultConfig = logManager.currentDefaultConfig
-        logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
-      })
+      // Simulate Runtime error
+      simulateError.hasError = true
+      simulateError.errorType = ErrorTypes.RuntimeException
+      assertThrows(classOf[RuntimeException], runLoadLogs)
       assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed")
+      assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown")
+
+      // Simulate Kafka storage error with IOException cause
+      // in this case, the logDir will be added into offline list before KafkaStorageThrown. So we don't verify it here
+      simulateError.errorType = ErrorTypes.KafkaStorageExceptionWithIOExceptionCause
+      assertDoesNotThrow(runLoadLogs, "KafkaStorageException with IOException cause should be caught and handled")
+
+      // Simulate Kafka storage error without IOException cause
+      simulateError.errorType = ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause
+      assertThrows(classOf[KafkaStorageException], runLoadLogs, "should throw exception when KafkaStorageException without IOException cause")
+      assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when KafkaStorageException without IOException cause thrown")
+
+      // Simulate IO error
+      simulateError.errorType = ErrorTypes.IOException
+      assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled")
+      assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown")
+
       // Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time.
       simulateError.hasError = false
       cleanShutdownInterceptedValue = true
       val defaultConfig = logManager.currentDefaultConfig
       logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
       assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag")
+      logManager.shutdown()
     }
   }
 
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 16d17d2fd2..7e9fb1e726 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
  */
 package kafka.server
 
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, Exit, TestUtils}
 
 import java.io.{DataInputStream, File}
 import java.net.ServerSocket
@@ -30,7 +30,6 @@ import kafka.zookeeper.ZooKeeperClientTimeoutException
 import org.apache.kafka.clients.consumer.KafkaConsumer
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.network.ListenerName
 import org.apache.kafka.common.protocol.ApiKeys
@@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
 import org.apache.kafka.metadata.BrokerState
 import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
 import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
 import org.junit.jupiter.params.ParameterizedTest
 import org.junit.jupiter.params.provider.ValueSource
 
@@ -158,14 +158,33 @@ class ServerShutdownTest extends KafkaServerTestHarness {
 
   @ParameterizedTest
   @ValueSource(strings = Array("zk", "kraft"))
-  def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+  def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
     createTopic(topic)
     shutdownBroker()
     config.logDirs.foreach { dirName =>
       val partitionDir = new File(dirName, s"$topic-0")
       partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
     }
-    verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
+
+    val expectedStatusCode = Some(1)
+    @volatile var receivedStatusCode = Option.empty[Int]
+    @volatile var hasHaltProcedureCalled = false
+    Exit.setHaltProcedure((statusCode, _) => {
+      hasHaltProcedureCalled = true
+      receivedStatusCode = Some(statusCode)
+    }.asInstanceOf[Nothing])
+
+    try {
+      val recreateBrokerExec: Executable = () => recreateBroker(true)
+      // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception
+      assertDoesNotThrow(recreateBrokerExec)
+      // JVM should exit with status code 1
+      TestUtils.waitUntilTrue(() => hasHaltProcedureCalled == true && expectedStatusCode == receivedStatusCode,
+        s"Expected to halt directly with the expected status code:${expectedStatusCode.get}, " +
+          s"but got hasHaltProcedureCalled: $hasHaltProcedureCalled and received status code: ${receivedStatusCode.orNull}")
+    } finally {
+      Exit.resetHaltProcedure()
+    }
   }
 
   @ParameterizedTest