You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by sh...@apache.org on 2022/06/04 08:22:11 UTC
[kafka] branch 3.2 updated: KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)
This is an automated email from the ASF dual-hosted git repository.
showuon pushed a commit to branch 3.2
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/3.2 by this push:
new 90db4f47d6 KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)
90db4f47d6 is described below
commit 90db4f47d6394fd4b90b934bf0bb2d7965cd73a6
Author: Luke Chen <sh...@gmail.com>
AuthorDate: Thu Jun 2 14:15:51 2022 +0800
KAFKA-13773: catch kafkaStorageException to avoid broker shutdown directly (#12136)
When logManager startup and loadLogs, we expect to catch any IOException (ex: out of space error) and turn the log dir into offline. Later, we'll handle the offline logDir in ReplicaManage, so that the cleanShutdown file won't be created when all logDirs are offline. The reason why the broker shutdown with cleanShutdown file after full disk is because during loadLogs and do log recovery, we'll write leader-epoch-checkpoint fil. And if any IOException thrown, we'll wrap it as KafkaStor [...]
This PR is to fix the issue by catching the KafkaStorageException with IOException cause exceptions during loadLogs, and mark the logDir as offline to let the ReplicaManager handle the offline logDirs.
Reviewers: Jun Rao <ju...@confluent.io>, Alok Thatikunta <al...@gmail.com>
---
core/src/main/scala/kafka/log/LogManager.scala | 14 ++--
core/src/main/scala/kafka/server/KafkaServer.scala | 8 ++-
.../test/scala/unit/kafka/log/LogLoaderTest.scala | 84 +++++++++++++++++-----
.../unit/kafka/server/ServerShutdownTest.scala | 27 +++++--
4 files changed, 107 insertions(+), 26 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index b81f6a928a..93d1eee740 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -317,6 +317,11 @@ class LogManager(logDirs: Seq[File],
val jobs = ArrayBuffer.empty[Seq[Future[_]]]
var numTotalLogs = 0
+ def handleIOException(logDirAbsolutePath: String, e: IOException): Unit = {
+ offlineDirs.add((logDirAbsolutePath, e))
+ error(s"Error while loading log dir $logDirAbsolutePath", e)
+ }
+
for (dir <- liveLogDirs) {
val logDirAbsolutePath = dir.getAbsolutePath
var hadCleanShutdown: Boolean = false
@@ -375,8 +380,10 @@ class LogManager(logDirs: Seq[File],
s"($currentNumLoaded/${logsToLoad.length} loaded in $logDirAbsolutePath)")
} catch {
case e: IOException =>
- offlineDirs.add((logDirAbsolutePath, e))
- error(s"Error while loading log dir $logDirAbsolutePath", e)
+ handleIOException(logDirAbsolutePath, e)
+ case e: KafkaStorageException if e.getCause.isInstanceOf[IOException] =>
+ // KafkaStorageException might be thrown, ex: during writing LeaderEpochFileCache
+ // And while converting IOException to KafkaStorageException, we've already handled the exception. So we can ignore it here.
}
}
runnable
@@ -385,8 +392,7 @@ class LogManager(logDirs: Seq[File],
jobs += jobsForDir.map(pool.submit)
} catch {
case e: IOException =>
- offlineDirs.add((logDirAbsolutePath, e))
- error(s"Error while loading log dir $logDirAbsolutePath", e)
+ handleIOException(logDirAbsolutePath, e)
}
}
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 67013d3391..5e66fd7f05 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -830,7 +830,13 @@ class KafkaServer(
private def checkpointBrokerMetadata(brokerMetadata: ZkMetaProperties) = {
for (logDir <- config.logDirs if logManager.isLogDirOnline(new File(logDir).getAbsolutePath)) {
val checkpoint = brokerMetadataCheckpoints(logDir)
- checkpoint.write(brokerMetadata.toProperties)
+ try {
+ checkpoint.write(brokerMetadata.toProperties)
+ } catch {
+ case e: IOException =>
+ val dirPath = checkpoint.file.getAbsolutePath
+ logDirFailureChannel.maybeAddOfflineLogDir(dirPath, s"Error while writing meta.properties to $dirPath", e)
+ }
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index a6b114320a..4f659da7dd 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -17,7 +17,7 @@
package kafka.log
-import java.io.{BufferedWriter, File, FileWriter}
+import java.io.{BufferedWriter, File, FileWriter, IOException}
import java.nio.ByteBuffer
import java.nio.file.{Files, NoSuchFileException, Paths}
import java.util.Properties
@@ -27,9 +27,11 @@ import kafka.server.{BrokerTopicStats, FetchDataInfo, KafkaConfig, LogDirFailure
import kafka.server.metadata.MockConfigRepository
import kafka.utils.{CoreUtils, MockTime, Scheduler, TestUtils}
import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.record.{CompressionType, ControlRecordType, DefaultRecordBatch, MemoryRecords, RecordBatch, RecordVersion, SimpleRecord, TimestampType}
import org.apache.kafka.common.utils.{Time, Utils}
-import org.junit.jupiter.api.Assertions.{assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.Assertions.{assertDoesNotThrow, assertEquals, assertFalse, assertNotEquals, assertThrows, assertTrue}
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.api.{AfterEach, BeforeEach, Test}
import org.mockito.ArgumentMatchers
import org.mockito.ArgumentMatchers.{any, anyLong}
@@ -61,6 +63,12 @@ class LogLoaderTest {
Utils.delete(tmpDir)
}
+ object ErrorTypes extends Enumeration {
+ type Errors = Value
+ val IOException, RuntimeException, KafkaStorageExceptionWithIOExceptionCause,
+ KafkaStorageExceptionWithoutIOExceptionCause = Value
+ }
+
@Test
def testLogRecoveryIsCalledUponBrokerCrash(): Unit = {
// LogManager must realize correctly if the last shutdown was not clean and the logs need
@@ -73,15 +81,19 @@ class LogLoaderTest {
var log: UnifiedLog = null
val time = new MockTime()
var cleanShutdownInterceptedValue = false
- case class SimulateError(var hasError: Boolean = false)
+ case class SimulateError(var hasError: Boolean = false, var errorType: ErrorTypes.Errors = ErrorTypes.RuntimeException)
val simulateError = SimulateError()
+ val logDirFailureChannel = new LogDirFailureChannel(logDirs.size)
val maxTransactionTimeoutMs = 5 * 60 * 1000
val maxProducerIdExpirationMs = 60 * 60 * 1000
// Create a LogManager with some overridden methods to facilitate interception of clean shutdown
- // flag and to inject a runtime error
- def interceptedLogManager(logConfig: LogConfig, logDirs: Seq[File], simulateError: SimulateError): LogManager = {
+ // flag and to inject an error
+ def interceptedLogManager(logConfig: LogConfig,
+ logDirs: Seq[File],
+ logDirFailureChannel: LogDirFailureChannel
+ ): LogManager = {
new LogManager(
logDirs = logDirs.map(_.getAbsoluteFile),
initialOfflineDirs = Array.empty[File],
@@ -98,7 +110,7 @@ class LogLoaderTest {
interBrokerProtocolVersion = config.interBrokerProtocolVersion,
scheduler = time.scheduler,
brokerTopicStats = new BrokerTopicStats(),
- logDirFailureChannel = new LogDirFailureChannel(logDirs.size),
+ logDirFailureChannel = logDirFailureChannel,
time = time,
keepPartitionMetadataFile = config.usesTopicId) {
@@ -106,7 +118,16 @@ class LogLoaderTest {
logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig,
topicConfigs: Map[String, LogConfig]): UnifiedLog = {
if (simulateError.hasError) {
- throw new RuntimeException("Simulated error")
+ simulateError.errorType match {
+ case ErrorTypes.KafkaStorageExceptionWithIOExceptionCause =>
+ throw new KafkaStorageException(new IOException("Simulated Kafka storage error with IOException cause"))
+ case ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause =>
+ throw new KafkaStorageException("Simulated Kafka storage error without IOException cause")
+ case ErrorTypes.IOException =>
+ throw new IOException("Simulated IO error")
+ case _ =>
+ throw new RuntimeException("Simulated Runtime error")
+ }
}
cleanShutdownInterceptedValue = hadCleanShutdown
val topicPartition = UnifiedLog.parseTopicPartitionName(logDir)
@@ -132,10 +153,24 @@ class LogLoaderTest {
}
}
+ def initializeLogManagerForSimulatingErrorTest(logDirFailureChannel: LogDirFailureChannel = new LogDirFailureChannel(logDirs.size)
+ ): (LogManager, Executable) = {
+ val logManager: LogManager = interceptedLogManager(logConfig, logDirs, logDirFailureChannel)
+ log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+
+ assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not be offline before load logs")
+
+ val runLoadLogs: Executable = () => {
+ val defaultConfig = logManager.currentDefaultConfig
+ logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
+ }
+
+ (logManager, runLoadLogs)
+ }
+
val cleanShutdownFile = new File(logDir, LogLoader.CleanShutdownFile)
locally {
- val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError)
- log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+ val (logManager, _) = initializeLogManagerForSimulatingErrorTest()
// Load logs after a clean shutdown
Files.createFile(cleanShutdownFile.toPath)
@@ -156,22 +191,37 @@ class LogLoaderTest {
}
locally {
- simulateError.hasError = true
- val logManager: LogManager = interceptedLogManager(logConfig, logDirs, simulateError)
- log = logManager.getOrCreateLog(topicPartition, isNew = true, topicId = None)
+ val (logManager, runLoadLogs) = initializeLogManagerForSimulatingErrorTest(logDirFailureChannel)
- // Simulate error
- assertThrows(classOf[RuntimeException], () => {
- val defaultConfig = logManager.currentDefaultConfig
- logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
- })
+ // Simulate Runtime error
+ simulateError.hasError = true
+ simulateError.errorType = ErrorTypes.RuntimeException
+ assertThrows(classOf[RuntimeException], runLoadLogs)
assertFalse(cleanShutdownFile.exists(), "Clean shutdown file must not have existed")
+ assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when Runtime Exception thrown")
+
+ // Simulate Kafka storage error with IOException cause
+ // in this case, the logDir will be added into offline list before KafkaStorageThrown. So we don't verify it here
+ simulateError.errorType = ErrorTypes.KafkaStorageExceptionWithIOExceptionCause
+ assertDoesNotThrow(runLoadLogs, "KafkaStorageException with IOException cause should be caught and handled")
+
+ // Simulate Kafka storage error without IOException cause
+ simulateError.errorType = ErrorTypes.KafkaStorageExceptionWithoutIOExceptionCause
+ assertThrows(classOf[KafkaStorageException], runLoadLogs, "should throw exception when KafkaStorageException without IOException cause")
+ assertFalse(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "log dir should not turn offline when KafkaStorageException without IOException cause thrown")
+
+ // Simulate IO error
+ simulateError.errorType = ErrorTypes.IOException
+ assertDoesNotThrow(runLoadLogs, "IOException should be caught and handled")
+ assertTrue(logDirFailureChannel.hasOfflineLogDir(logDir.getAbsolutePath), "the log dir should turn offline after IOException thrown")
+
// Do not simulate error on next call to LogManager#loadLogs. LogManager must understand that log had unclean shutdown the last time.
simulateError.hasError = false
cleanShutdownInterceptedValue = true
val defaultConfig = logManager.currentDefaultConfig
logManager.loadLogs(defaultConfig, logManager.fetchTopicConfigOverrides(defaultConfig, Set.empty))
assertFalse(cleanShutdownInterceptedValue, "Unexpected value for clean shutdown flag")
+ logManager.shutdown()
}
}
diff --git a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
index 16d17d2fd2..7e9fb1e726 100644
--- a/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
+++ b/core/src/test/scala/unit/kafka/server/ServerShutdownTest.scala
@@ -16,7 +16,7 @@
*/
package kafka.server
-import kafka.utils.{CoreUtils, TestUtils}
+import kafka.utils.{CoreUtils, Exit, TestUtils}
import java.io.{DataInputStream, File}
import java.net.ServerSocket
@@ -30,7 +30,6 @@ import kafka.zookeeper.ZooKeeperClientTimeoutException
import org.apache.kafka.clients.consumer.KafkaConsumer
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.Uuid
-import org.apache.kafka.common.errors.KafkaStorageException
import org.apache.kafka.common.metrics.Metrics
import org.apache.kafka.common.network.ListenerName
import org.apache.kafka.common.protocol.ApiKeys
@@ -41,6 +40,7 @@ import org.apache.kafka.common.utils.Time
import org.apache.kafka.metadata.BrokerState
import org.junit.jupiter.api.{BeforeEach, Test, TestInfo, Timeout}
import org.junit.jupiter.api.Assertions._
+import org.junit.jupiter.api.function.Executable
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.ValueSource
@@ -158,14 +158,33 @@ class ServerShutdownTest extends KafkaServerTestHarness {
@ParameterizedTest
@ValueSource(strings = Array("zk", "kraft"))
- def testCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
+ def testNoCleanShutdownAfterFailedStartupDueToCorruptLogs(quorum: String): Unit = {
createTopic(topic)
shutdownBroker()
config.logDirs.foreach { dirName =>
val partitionDir = new File(dirName, s"$topic-0")
partitionDir.listFiles.foreach(f => TestUtils.appendNonsenseToFile(f, TestUtils.random.nextInt(1024) + 1))
}
- verifyCleanShutdownAfterFailedStartup[KafkaStorageException](quorum)
+
+ val expectedStatusCode = Some(1)
+ @volatile var receivedStatusCode = Option.empty[Int]
+ @volatile var hasHaltProcedureCalled = false
+ Exit.setHaltProcedure((statusCode, _) => {
+ hasHaltProcedureCalled = true
+ receivedStatusCode = Some(statusCode)
+ }.asInstanceOf[Nothing])
+
+ try {
+ val recreateBrokerExec: Executable = () => recreateBroker(true)
+ // this startup should fail with no online log dir (due to corrupted log), and exit directly without throwing exception
+ assertDoesNotThrow(recreateBrokerExec)
+ // JVM should exit with status code 1
+ TestUtils.waitUntilTrue(() => hasHaltProcedureCalled == true && expectedStatusCode == receivedStatusCode,
+ s"Expected to halt directly with the expected status code:${expectedStatusCode.get}, " +
+ s"but got hasHaltProcedureCalled: $hasHaltProcedureCalled and received status code: ${receivedStatusCode.orNull}")
+ } finally {
+ Exit.resetHaltProcedure()
+ }
}
@ParameterizedTest