You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "Haoze Wu (Jira)" <ji...@apache.org> on 2021/11/20 03:24:00 UTC
[jira] [Updated] (KAFKA-13468) Consumers may hang because IOException in Log# does not trigger KafkaStorageException
[ https://issues.apache.org/jira/browse/KAFKA-13468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Haoze Wu updated KAFKA-13468:
-----------------------------
Description:
When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.
{code:java}
class Log(...) {
// ...
locally {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
initializePartitionMetadata()
val nextOffset = loadSegments()
// ...
}
// ...
}{code}
We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds.
{code:java}
[2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes.
The IOException sometimes affects multiple offset topics:
{code:java}
[2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ...
addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code}
*Analysis*
The key stacktrace is as follows:
{code:java}
"java.lang.Thread,run,748",
"kafka.server.KafkaRequestHandler,run,74",
"kafka.server.KafkaApis,handle,236",
"kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
"kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
"kafka.server.ReplicaManager,makeLeaders,1566",
"scala.collection.mutable.HashMap,foreachEntry,499",
"scala.collection.mutable.HashMap$Node,foreachEntry,633",
"kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
"kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
"kafka.cluster.Partition,makeLeader,548",
"kafka.cluster.Partition,$anonfun$makeLeader$1,564",
"kafka.cluster.Partition,createLogIfNotExists,324",
"kafka.cluster.Partition,createLog,344",
"kafka.log.LogManager,getOrCreateLog,783",
"scala.Option,getOrElse,201",
"kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
"kafka.log.Log$,apply,2601",
"kafka.log.Log,<init>,323" {code}
Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`
{code:java}
override def handle(request: RequestChannel.Request): Unit = {
try {
request.header.apiKey match {
// ...
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
// ...
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => requestHelper.handleError(request, e)
} finally {
// ...
}
}
{code}
I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO.
{code:java}
/*
* Make the current broker to become leader for a given set of partitions by:
*
* 1. Stop fetchers for these partitions
* 2. Update the partition metadata in cache
* 3. Add these partitions to the leader partitions set
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made leader due to this method
*
* TODO: the above may need to be fixed later
*/
private def makeLeaders(...): Set[Partition] = {
// ...
try {
// ...
partitionStates.forKeyValue { (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(...)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(...)
val dirOpt = getLogDir(partition.topicPartition)
error(...)
responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
} catch {
case e: Throwable =>
partitionStates.keys.foreach { partition =>
stateChangeLogger.error(...)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
// ...
} {code}
*Fix*
To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload.
One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]
was:
When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.
{code:java}
class Log(...) {
// ...
locally {
// create the log directory if it doesn't exist
Files.createDirectories(dir.toPath)
initializeLeaderEpochCache()
initializePartitionMetadata()
val nextOffset = loadSegments()
// ...
}
// ...
}{code}
We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds.
{code:java}
[2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes.
The IOException sometimes affects multiple offset topics:
{code:java}
[2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ...
addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code}
*Analysis*
The key stacktrace is as follows:
{code:java}
"java.lang.Thread,run,748",
"kafka.server.KafkaRequestHandler,run,74",
"kafka.server.KafkaApis,handle,236",
"kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
"kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
"kafka.server.ReplicaManager,makeLeaders,1566",
"scala.collection.mutable.HashMap,foreachEntry,499",
"scala.collection.mutable.HashMap$Node,foreachEntry,633",
"kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
"kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
"kafka.cluster.Partition,makeLeader,548",
"kafka.cluster.Partition,$anonfun$makeLeader$1,564",
"kafka.cluster.Partition,createLogIfNotExists,324",
"kafka.cluster.Partition,createLog,344",
"kafka.log.LogManager,getOrCreateLog,783",
"scala.Option,getOrElse,201",
"kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
"kafka.log.Log$,apply,2601",
"kafka.log.Log,<init>,323" {code}
Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`
{code:java}
override def handle(request: RequestChannel.Request): Unit = {
try {
request.header.apiKey match {
// ...
case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
// ...
}
} catch {
case e: FatalExitError => throw e
case e: Throwable => requestHelper.handleError(request, e)
} finally {
// ...
}
}
{code}
I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO.
{code:java}
/*
* Make the current broker to become leader for a given set of partitions by:
*
* 1. Stop fetchers for these partitions
* 2. Update the partition metadata in cache
* 3. Add these partitions to the leader partitions set
*
* If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
* the error message will be set on each partition since we do not know which partition caused it. Otherwise,
* return the set of partitions that are made leader due to this method
*
* TODO: the above may need to be fixed later
*/
private def makeLeaders(...): Set[Partition] = {
// ...
try {
// ...
partitionStates.forKeyValue { (partition, partitionState) =>
try {
if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568
partitionsToMakeLeaders += partition
else
stateChangeLogger.info(...)
} catch {
case e: KafkaStorageException =>
stateChangeLogger.error(...)
val dirOpt = getLogDir(partition.topicPartition)
error(...)
responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
}
}
} catch {
case e: Throwable =>
partitionStates.keys.foreach { partition =>
stateChangeLogger.error(...)
}
// Re-throw the exception for it to be caught in KafkaApis
throw e
}
// ...
} {code}
*Fix*
To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload.
One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]
> Consumers may hang because IOException in Log#<init> does not trigger KafkaStorageException
> -------------------------------------------------------------------------------------------
>
> Key: KAFKA-13468
> URL: https://issues.apache.org/jira/browse/KAFKA-13468
> Project: Kafka
> Issue Type: Bug
> Components: log
> Affects Versions: 2.8.0
> Reporter: Haoze Wu
> Priority: Major
>
> When the Kafka Log class (`core/src/main/scala/kafka/log/Log.scala`) is initialized, it may encounter an IO exception in the locally block, e.g., when the log directory cannot be created due to permission issue or IOException in `initializeLeaderEpochCache`, `initializePartitionMetadata`, etc.
> {code:java}
> class Log(...) {
> // ...
> locally {
> // create the log directory if it doesn't exist
> Files.createDirectories(dir.toPath)
> initializeLeaderEpochCache()
> initializePartitionMetadata()
> val nextOffset = loadSegments()
> // ...
> }
> // ...
> }{code}
> We found that the broker encountering the IO exception prints an KafkaApi error log like the following and proceeds.
> {code:java}
> [2021-11-17 22:41:30,057] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=1, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967362, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='gray-2-0', topicId=573bAVHfRQeXApzAKevNIg, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-2-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-1-0', topicId=12dW2FxLTiyKmGi41HhdZQ, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-1-0', partitionIndex=1, controllerEpoch=1, leader=3, leaderEpoch=0, isr=[3, 1], zkVersion=0, replicas=[3, 1], addingReplicas=[], removingReplicas=[], isNew=true)]), LeaderAndIsrTopicState(topicName='gray-3-0', topicId=_yvmANyZSoK_PTV0e-nqCA, partitionStates=[LeaderAndIsrPartitionState(topicName='gray-3-0', partitionIndex=1, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1, 3], zkVersion=0, replicas=[1, 3], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791), LeaderAndIsrLiveLeader(brokerId=3, hostName='localhost', port=9793)]) (kafka.server.RequestHandlerHelper) {code}
> But all the consumers that are consuming data from the affected topics (“gray-2-0”, “gray-1-0”, “gray-3-0”) are not able to proceed. These consumers don’t have any error log related to this issue. They hang for more than 3 minutes.
> The IOException sometimes affects multiple offset topics:
> {code:java}
> [2021-11-18 10:57:41,289] ERROR [KafkaApi-1] Error when handling request: clientId=1, correlationId=11, api=LEADER_AND_ISR, version=5, body=LeaderAndIsrRequestData(controllerId=1, controllerEpoch=1, brokerEpoch=4294967355, type=0, ungroupedPartitionStates=[], topicStates=[LeaderAndIsrTopicState(topicName='__consumer_offsets', topicId=_MiMTCViS76osIyDdxekIg, partitionStates=[LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=15, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=48, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=45, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true), ...
> addingReplicas=[], removingReplicas=[], isNew=true), LeaderAndIsrPartitionState(topicName='__consumer_offsets', partitionIndex=33, controllerEpoch=1, leader=1, leaderEpoch=0, isr=[1], zkVersion=0, replicas=[1], addingReplicas=[], removingReplicas=[], isNew=true)])], liveLeaders=[LeaderAndIsrLiveLeader(brokerId=1, hostName='localhost', port=9791)]) (kafka.server.RequestHandlerHelper) {code}
> *Analysis*
> The key stacktrace is as follows:
> {code:java}
> "java.lang.Thread,run,748",
> "kafka.server.KafkaRequestHandler,run,74",
> "kafka.server.KafkaApis,handle,236",
> "kafka.server.KafkaApis,handleLeaderAndIsrRequest,258",
> "kafka.server.ReplicaManager,becomeLeaderOrFollower,1411",
> "kafka.server.ReplicaManager,makeLeaders,1566",
> "scala.collection.mutable.HashMap,foreachEntry,499",
> "scala.collection.mutable.HashMap$Node,foreachEntry,633",
> "kafka.utils.Implicits$MapExtensionMethods$,$anonfun$forKeyValue$1,62",
> "kafka.server.ReplicaManager,$anonfun$makeLeaders$5,1568",
> "kafka.cluster.Partition,makeLeader,548",
> "kafka.cluster.Partition,$anonfun$makeLeader$1,564",
> "kafka.cluster.Partition,createLogIfNotExists,324",
> "kafka.cluster.Partition,createLog,344",
> "kafka.log.LogManager,getOrCreateLog,783",
> "scala.Option,getOrElse,201",
> "kafka.log.LogManager,$anonfun$getOrCreateLog$1,830",
> "kafka.log.Log$,apply,2601",
> "kafka.log.Log,<init>,323" {code}
> Basically, the IOException is not be handled by Log but instead gets propagated all the way back to `core/src/main/scala/kafka/server/KafkaApis.scala`
> {code:java}
> override def handle(request: RequestChannel.Request): Unit = {
> try {
> request.header.apiKey match {
> // ...
> case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
> // ...
> }
> } catch {
> case e: FatalExitError => throw e
> case e: Throwable => requestHelper.handleError(request, e)
> } finally {
> // ...
> }
> }
> {code}
> I also notice the ReplicaManager in `core/src/main/scala/kafka/server/ReplicaManager.scala` has a relevant comment about “unexpected error” with a TODO.
> {code:java}
> /*
> * Make the current broker to become leader for a given set of partitions by:
> *
> * 1. Stop fetchers for these partitions
> * 2. Update the partition metadata in cache
> * 3. Add these partitions to the leader partitions set
> *
> * If an unexpected error is thrown in this function, it will be propagated to KafkaApis where
> * the error message will be set on each partition since we do not know which partition caused it. Otherwise,
> * return the set of partitions that are made leader due to this method
> *
> * TODO: the above may need to be fixed later
> */
> private def makeLeaders(...): Set[Partition] = {
> // ...
> try {
> // ...
> partitionStates.forKeyValue { (partition, partitionState) =>
> try {
> if (partition.makeLeader(partitionState, highWatermarkCheckpoints)) // line 1568
> partitionsToMakeLeaders += partition
> else
> stateChangeLogger.info(...)
> } catch {
> case e: KafkaStorageException =>
> stateChangeLogger.error(...)
> val dirOpt = getLogDir(partition.topicPartition)
> error(...)
> responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
> }
> }
> } catch {
> case e: Throwable =>
> partitionStates.keys.foreach { partition =>
> stateChangeLogger.error(...)
> }
> // Re-throw the exception for it to be caught in KafkaApis
> throw e
> }
> // ...
> } {code}
> *Fix*
> To fix this issue, I think we should catch the potential IOException when Log is initialized, and then throw a KafkaStorageException, just like many other IOException handlers in Kafka, e.g., [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120]
> After applying this fix, the aforementioned symptoms will disappear, i.e., the consumers will not hang and proceed to finish the remaining workload.
> One question is whether we should also use `logDirFailureChannel.maybeAddOfflineLogDir` to handle the IOException, like [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L92-L120] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala#L126-L139] . If so, `logDirFailureChannel.maybeAddOfflineLogDir` would crash the node according to the protocol in [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L268-L277] and [https://github.com/apache/kafka/blob/ebb1d6e21cc9213071ee1c6a15ec3411fc215b81/core/src/main/scala/kafka/server/ReplicaManager.scala#L327-L332]
--
This message was sent by Atlassian Jira
(v8.20.1#820001)