You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2021/03/31 22:28:49 UTC

[kafka] branch trunk updated: KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b6278ee  KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)
b6278ee is described below

commit b6278ee79da722f85aaefe7755a2be920887175c
Author: Kowshik Prakasam <kp...@confluent.io>
AuthorDate: Wed Mar 31 15:26:40 2021 -0700

    KAFKA-12575: Eliminate Log.isLogDirOffline boolean attribute (#10430)
    
    This PR is a precursor to the recovery logic refactor work (KAFKA-12553).
    
    I have made a change to eliminate Log.isLogDirOffline attribute. This boolean also comes in the way of refactoring the recovery logic. This attribute was added in #9676. But it is redundant and can be eliminated in favor of looking up LogDirFailureChannel to check if the logDir is offline. The performance/latency implication of such a ConcurrentHashMap lookup inside LogDirFailureChannel should be very low given that ConcurrentHashMap reads are usually lock free.
    
    Tests:
    Relying on existing unit/integration tests.
    
    Reviewers: Dhruvil Shah <dh...@confluent.io>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/Log.scala                 | 17 ++++-------------
 .../main/scala/kafka/server/LogDirFailureChannel.scala  |  4 ++++
 2 files changed, 8 insertions(+), 13 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 057716a..a469c3c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -283,10 +283,6 @@ class Log(@volatile private var _dir: File,
 
   @volatile private var nextOffsetMetadata: LogOffsetMetadata = _
 
-  // Log dir failure is handled asynchronously we need to prevent threads
-  // from reading inconsistent state caused by a failure in another thread
-  @volatile private var logDirOffline = false
-
   /* The earliest offset which is part of an incomplete transaction. This is used to compute the
    * last stable offset (LSO) in ReplicaManager. Note that it is possible that the "true" first unstable offset
    * gets removed from the log (through record or segment deletion). In this case, the first unstable offset
@@ -1332,12 +1328,6 @@ class Log(@volatile private var _dir: File,
     }
   }
 
-  private def checkForLogDirFailure(): Unit = {
-    if (logDirOffline) {
-      throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
-    }
-  }
-
   def maybeAssignEpochStartOffset(leaderEpoch: Int, startOffset: Long): Unit = {
     leaderEpochCache.foreach { cache =>
       cache.assign(leaderEpoch, startOffset)
@@ -2428,13 +2418,14 @@ class Log(@volatile private var _dir: File,
   private[log] def addSegment(segment: LogSegment): LogSegment = this.segments.add(segment)
 
   private def maybeHandleIOException[T](msg: => String)(fun: => T): T = {
+    if (logDirFailureChannel.hasOfflineLogDir(parentDir)) {
+      throw new KafkaStorageException(s"The log dir $parentDir is offline due to a previous IO exception.")
+    }
     try {
-      checkForLogDirFailure()
       fun
     } catch {
       case e: IOException =>
-        logDirOffline = true
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
+        logDirFailureChannel.maybeAddOfflineLogDir(parentDir, msg, e)
         throw new KafkaStorageException(msg, e)
     }
   }
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index 897d3fc..71ba9ac 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -39,6 +39,10 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
   private val offlineLogDirs = new ConcurrentHashMap[String, String]
   private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
 
+  def hasOfflineLogDir(logDir: String): Boolean = {
+    offlineLogDirs.containsKey(logDir)
+  }
+
   /*
    * If the given logDir is not already offline, add it to the
    * set of offline log dirs and enqueue it to the logDirFailureEvent queue