You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/03/31 22:28:49 UTC
[kafka] branch trunk updated: KAFKA-12575: Eliminate
Log.isLogDirOffline boolean attribute (#10430)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new b6278ee KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)
b6278ee is described below
commit b6278ee79da722f85aaefe7755a2be920887175c
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Wed Mar 31 15:26:40 2021 -0700
KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)
This PR is a precursor to the recovery logic refactor work (KAFKA-12553).
I have made a change to eliminate Log.isLogDirOffline attribute. This boolean also comes in the way of refactoring the recovery logic. This attribute was added in #9676. But it is redundant and can be eliminated in favor of looking up LogDirFailureChannel to check if the logDir is offline. The performance/latency implication of such a ConcurrentHashMap lookup inside LogDirFailureChannel should be very low given that ConcurrentHashMap reads are usually lock free.
Tests:
Relying on existing unit/integration tests.
Reviewers: Dhruvil Shah <dh...@confluent.io>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/Log.scala | 17 ++++-------------
.../main/scala/kafka/server/LogDirFailureChannel.scala | 4 ++++
2 files changed, 8 insertions(+), 13 deletions(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 057716a..a469c3c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -283,10 +283,6 @@ class Log(@volatile private var _dir: File,
@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
- // Log dir failure is handled asynchronously we need to prevent threads
- // from reading inconsistent state caused by a failure in another thread
- @volatile private var logDirOffline = false
-
/* The earliest offset which is part of an incomplete transaction. This is used to compute the
* last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
* gets removed from the log (through record or segment deletion). In this case, the first unstable offset
@@ -1332,12 +1328,6 @@ class Log(@volatile private var _dir: File,
}
}
- private def checkForLogDirFailure(): Unit = {
- if (logDirOffline) {
- throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
- }
- }
-
def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = {
leaderEpochCache.foreach { cache =>
cache.assign(leaderEpoch, startOffset)
@@ -2428,13 +2418,14 @@ class Log(@volatile private var _dir: File,
private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.add(segment)
private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
+ if (logDirFailureChannel.hasOfflineLogDir(parentDir)) {
+ throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
+ }
try {
- checkForLogDirFailure()
fun
} catch {
case e: IOException =>
- logDirOffline = true
- logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
+ logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e)
throw new KafkaStorageException(msg, e)
}
}
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index 897d3fc..71ba9ac 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -39,6 +39,10 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
private val offlineLogDirs = new ConcurrentHashMap[String, String]
private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
+ def hasOfflineLogDir(logDir: String): Boolean = {
+ offlineLogDirs.containsKey(logDir)
+ }
+
/*
* If the given logDir is not already offline, add it to the
* set of offline log dirs and enqueue it to the logDirFailureEvent queue