You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/05/29 23:52:00 UTC

[jira] [Commented] (KAFKA-5812) Represent logDir with case class where absolute path is required

    [ https://issues.apache.org/jira/browse/KAFKA-5812?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16494499#comment-16494499 ] 

ASF GitHub Bot commented on KAFKA-5812:
---------------------------------------

lindong28 closed pull request #3763: KAFKA-5812; Represent logDir with case class where absolute path is required
URL: https://github.com/apache/kafka/pull/3763
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 60ec7a09ba6..e759ac3a58c 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1600,7 +1600,7 @@ class Log(@volatile var dir: File,
       fun
     } catch {
       case e: IOException =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir.getParent, msg, e)
+        logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dir.getAbsoluteFile.getParent), msg, e)
         throw new KafkaStorageException(msg, e)
     }
   }
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 4f53b41df4f..39f6002a8f4 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -266,7 +266,7 @@ class LogCleaner(val config: CleanerConfig,
             case _: LogCleaningAbortedException => // task can be aborted, let it go.
             case e: IOException =>
               val msg = s"Failed to clean up log for ${cleanable.topicPartition} in dir ${cleanable.log.dir.getParent} due to IOException"
-              logDirFailureChannel.maybeAddOfflineLogDir(cleanable.log.dir.getParent, msg, e)
+              logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(cleanable.log.dir.getAbsoluteFile.getParent), msg, e)
           } finally {
             cleanerManager.doneCleaning(cleanable.topicPartition, cleanable.log.dir.getParentFile, endOffset)
           }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 406800113a7..ed5b145ab47 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -153,7 +153,8 @@ class LogManager(logDirs: Array[File],
     liveLogDirs
   }
 
-  def handleLogDirFailure(dir: String) {
+  def handleLogDirFailure(absoluteLogDir: AbsoluteLogDir) {
+    val dir = absoluteLogDir.path
     info(s"Stopping serving logs in dir $dir")
     logCreationOrDeletionLock synchronized {
       _liveLogDirs.remove(new File(dir))
@@ -194,7 +195,7 @@ class LogManager(logDirs: Array[File],
         Some(lock)
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while locking directory $dir", e)
+          logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dir.getAbsolutePath), s"Disk error while locking directory $dir", e)
           None
       }
     }
@@ -309,7 +310,7 @@ class LogManager(logDirs: Array[File],
         }
       }
       offlineDirs.foreach { case (dir, e) =>
-        logDirFailureChannel.maybeAddOfflineLogDir(dir, s"Error while deleting the clean shutdown file in dir $dir", e)
+        logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dir), s"Error while deleting the clean shutdown file in dir $dir", e)
       }
     } catch {
       case e: ExecutionException => {
@@ -502,7 +503,7 @@ class LogManager(logDirs: Array[File],
         this.recoveryPointCheckpoints.get(dir).foreach(_.write(recoveryPoints.get.mapValues(_.recoveryPoint)))
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point file in directory $dir", e)
+          logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dir.getAbsolutePath), s"Disk error while writing to recovery point file in directory $dir", e)
       }
     }
   }
@@ -519,7 +520,7 @@ class LogManager(logDirs: Array[File],
         ))
       } catch {
         case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to logStartOffset file in directory $dir", e)
+          logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dir.getAbsolutePath), s"Disk error while writing to logStartOffset file in directory $dir", e)
       }
     }
   }
@@ -572,7 +573,7 @@ class LogManager(logDirs: Array[File],
         } catch {
           case e: IOException =>
             val msg = s"Error while creating log for $topicPartition in dir ${dataDir.getAbsolutePath}"
-            logDirFailureChannel.maybeAddOfflineLogDir(dataDir.getAbsolutePath, msg, e)
+            logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(dataDir.getAbsolutePath), msg, e)
             throw new KafkaStorageException(msg, e)
         }
       }
@@ -637,7 +638,7 @@ class LogManager(logDirs: Array[File],
       } catch {
         case e: IOException =>
           val msg = s"Error while deleting $topicPartition in dir ${removedLog.dir.getParent}."
-          logDirFailureChannel.maybeAddOfflineLogDir(removedLog.dir.getParent, msg, e)
+          logDirFailureChannel.maybeAddOfflineLogDir(AbsoluteLogDir(removedLog.dir.getAbsoluteFile.getParent), msg, e)
           throw new KafkaStorageException(msg, e)
       }
     } else if (offlineLogDirs.nonEmpty) {
@@ -701,11 +702,11 @@ class LogManager(logDirs: Array[File],
     }
   }
 
-  def isLogDirOnline(logDir: String): Boolean = {
-    if (!logDirs.exists(_.getAbsolutePath == logDir))
-      throw new RuntimeException(s"Log dir $logDir is not found in the config.")
+  def isLogDirOnline(absoluteLogDir: AbsoluteLogDir): Boolean = {
+    if (!logDirs.exists(_.getAbsolutePath == absoluteLogDir.path))
+      throw new RuntimeException(s"Log dir $absoluteLogDir is not found in the config.")
 
-    _liveLogDirs.contains(new File(logDir))
+    _liveLogDirs.contains(new File(absoluteLogDir.path))
   }
 
   /**
@@ -758,8 +759,8 @@ object LogManager {
       backOffMs = config.logCleanerBackoffMs,
       enableCleaner = config.logCleanerEnable)
 
-    new LogManager(logDirs = config.logDirs.map(new File(_)).toArray,
-      initialOfflineDirs = initialOfflineDirs.map(new File(_)).toArray,
+    new LogManager(logDirs = config.logDirs.map(new File(_).getAbsoluteFile).toArray,
+      initialOfflineDirs = initialOfflineDirs.map(new File(_).getAbsoluteFile).toArray,
       topicConfigs = topicConfigs,
       defaultConfig = defaultLogConfig,
       cleanerConfig = cleanerConfig,
@@ -776,3 +777,7 @@ object LogManager {
       time = time)
   }
 }
+
+// Wrap log directory path in LogDir whenever absolute log directory path is required
+case class AbsoluteLogDir(path: String)
+
diff --git a/core/src/main/scala/kafka/server/KafkaServer.scala b/core/src/main/scala/kafka/server/KafkaServer.scala
index 1270e2f65b2..a9f2d9da8ff 100755
--- a/core/src/main/scala/kafka/server/KafkaServer.scala
+++ b/core/src/main/scala/kafka/server/KafkaServer.scala
@@ -30,7 +30,7 @@ import kafka.common.{GenerateBrokerIdException, InconsistentBrokerIdException}
 import kafka.controller.KafkaController
 import kafka.coordinator.group.GroupCoordinator
 import kafka.coordinator.transaction.TransactionCoordinator
-import kafka.log.{LogConfig, LogManager}
+import kafka.log.{AbsoluteLogDir, LogConfig, LogManager}
 import kafka.metrics.{KafkaMetricsGroup, KafkaMetricsReporter}
 import kafka.network.{BlockingChannel, SocketServer}
 import kafka.security.CredentialProvider
@@ -47,7 +47,7 @@ import org.apache.kafka.common.utils.{AppInfoParser, Time}
 import org.apache.kafka.common.{ClusterResource, Node}
 
 import scala.collection.JavaConverters._
-import scala.collection.{Seq, Map, mutable}
+import scala.collection.{Map, Seq, mutable}
 
 object KafkaServer {
   // Copy the subset of properties that are relevant to Logs
@@ -618,10 +618,10 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP
   private def checkpointBrokerId(brokerId: Int) {
     var logDirsWithoutMetaProps: List[String] = List()
 
-    for (logDir <- logManager.liveLogDirs) {
-      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir.getAbsolutePath).read()
+    for (logDir <- config.logDirs if logManager.isLogDirOnline(AbsoluteLogDir(new File(logDir).getAbsolutePath))) {
+      val brokerMetadataOpt = brokerMetadataCheckpoints(logDir).read()
       if(brokerMetadataOpt.isEmpty)
-        logDirsWithoutMetaProps ++= List(logDir.getAbsolutePath)
+        logDirsWithoutMetaProps ++= List(logDir)
     }
 
     for(logDir <- logDirsWithoutMetaProps) {
diff --git a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
index c78f04ecf82..ae2b50ebc7b 100644
--- a/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
+++ b/core/src/main/scala/kafka/server/LogDirFailureChannel.scala
@@ -21,6 +21,7 @@ package kafka.server
 import java.io.IOException
 import java.util.concurrent.{ArrayBlockingQueue, ConcurrentHashMap}
 
+import kafka.log.AbsoluteLogDir
 import kafka.utils.Logging
 
 /*
@@ -36,17 +37,17 @@ import kafka.utils.Logging
  */
 class LogDirFailureChannel(logDirNum: Int) extends Logging {
 
-  private val offlineLogDirs = new ConcurrentHashMap[String, String]
-  private val offlineLogDirQueue = new ArrayBlockingQueue[String](logDirNum)
+  private val offlineLogDirs = new ConcurrentHashMap[String, AbsoluteLogDir]
+  private val offlineLogDirQueue = new ArrayBlockingQueue[AbsoluteLogDir](logDirNum)
 
   /*
    * If the given logDir is not already offline, add it to the
    * set of offline log dirs and enqueue it to the logDirFailureEvent queue
    */
-  def maybeAddOfflineLogDir(logDir: String, msg: => String, e: IOException): Unit = {
+  def maybeAddOfflineLogDir(absoluteLogDir: AbsoluteLogDir, msg: => String, e: IOException): Unit = {
     error(msg, e)
-    if (offlineLogDirs.putIfAbsent(logDir, logDir) == null) {
-      offlineLogDirQueue.add(logDir)
+    if (offlineLogDirs.putIfAbsent(absoluteLogDir.path, absoluteLogDir) == null) {
+      offlineLogDirQueue.add(absoluteLogDir)
     }
   }
 
@@ -54,6 +55,6 @@ class LogDirFailureChannel(logDirNum: Int) extends Logging {
    * Get the next offline log dir from logDirFailureEvent queue.
    * The method will wait if necessary until a new offline log directory becomes available
    */
-  def takeNextOfflineLogDir(): String = offlineLogDirQueue.take()
+  def takeNextOfflineLogDir(): AbsoluteLogDir = offlineLogDirQueue.take()
 
 }
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 11e53447f8a..5990f168e60 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -24,13 +24,13 @@ import com.yammer.metrics.core.Gauge
 import kafka.api._
 import kafka.cluster.{Partition, Replica}
 import kafka.controller.KafkaController
-import kafka.log.{Log, LogAppendInfo, LogManager}
+import kafka.log.{AbsoluteLogDir, Log, LogAppendInfo, LogManager}
 import kafka.metrics.KafkaMetricsGroup
 import kafka.server.QuotaFactory.UnboundedQuota
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
 import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.errors.{KafkaStorageException, ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
+import org.apache.kafka.common.errors.{ControllerMovedException, CorruptRecordException, InvalidTimestampException, InvalidTopicException, KafkaStorageException, NotEnoughReplicasException, NotLeaderForPartitionException, OffsetOutOfRangeException, PolicyViolationException, _}
 import org.apache.kafka.common.internals.Topic
 import org.apache.kafka.common.metrics.Metrics
 import org.apache.kafka.common.protocol.Errors
@@ -1230,10 +1230,11 @@ class ReplicaManager(val config: KafkaConfig,
     }
   }
 
-  def handleLogDirFailure(dir: String) {
-    if (!logManager.isLogDirOnline(dir))
+  def handleLogDirFailure(absoluteLogDir: AbsoluteLogDir) {
+    if (!logManager.isLogDirOnline(absoluteLogDir))
       return
 
+    val dir = absoluteLogDir.path
     info(s"Stopping serving replicas in dir $dir")
     replicaStateChangeLock synchronized {
       val newOfflinePartitions = allPartitions.values.filter { partition =>
@@ -1264,7 +1265,7 @@ class ReplicaManager(val config: KafkaConfig,
       info("Broker %d stopped fetcher for partitions %s because they are in the failed log dir %s"
         .format(localBrokerId, newOfflinePartitions.mkString(", "), dir))
     }
-    logManager.handleLogDirFailure(dir)
+    logManager.handleLogDirFailure(absoluteLogDir)
     LogDirUtils.propagateLogDirEvent(zkUtils, localBrokerId)
     info(s"Stopped serving replicas in dir $dir")
   }
diff --git a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
index 4c1011f9429..97c847a98c4 100644
--- a/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
@@ -20,6 +20,7 @@ import java.io._
 import java.nio.charset.StandardCharsets
 import java.nio.file.{FileAlreadyExistsException, Files, Paths}
 
+import kafka.log.AbsoluteLogDir
 import kafka.server.LogDirFailureChannel
 import kafka.utils.Logging
 import org.apache.kafka.common.errors.KafkaStorageException
@@ -37,7 +38,7 @@ class CheckpointFile[T](val file: File,
                         version: Int,
                         formatter: CheckpointFileFormatter[T],
                         logDirFailureChannel: LogDirFailureChannel,
-                        logDir: String) extends Logging {
+                        absoluteLogDir: AbsoluteLogDir) extends Logging {
   private val path = file.toPath.toAbsolutePath
   private val tempPath = Paths.get(path.toString + ".tmp")
   private val lock = new Object()
@@ -73,7 +74,7 @@ class CheckpointFile[T](val file: File,
       } catch {
         case e: IOException =>
           val msg = s"Error while writing to checkpoint file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          logDirFailureChannel.maybeAddOfflineLogDir(absoluteLogDir, msg, e)
           throw new KafkaStorageException(msg, e)
       }
     }
@@ -121,7 +122,7 @@ class CheckpointFile[T](val file: File,
       } catch {
         case e: IOException =>
           val msg = s"Error while reading checkpoint file ${file.getAbsolutePath}"
-          logDirFailureChannel.maybeAddOfflineLogDir(logDir, msg, e)
+          logDirFailureChannel.maybeAddOfflineLogDir(absoluteLogDir, msg, e)
           throw new KafkaStorageException(msg, e)
       }
     }
diff --git a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
index a8db688ac13..52b6596b257 100644
--- a/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
 import java.io._
 import java.util.regex.Pattern
 
+import kafka.log.AbsoluteLogDir
 import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
 
@@ -59,7 +60,7 @@ object LeaderEpochCheckpointFile {
 class LeaderEpochCheckpointFile(val file: File, logDirFailureChannel: LogDirFailureChannel = null) extends LeaderEpochCheckpoint {
   import LeaderEpochCheckpointFile._
 
-  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, file.getParentFile.getParent)
+  val checkpoint = new CheckpointFile[EpochEntry](file, CurrentVersion, Formatter, logDirFailureChannel, AbsoluteLogDir(file.getParentFile.getAbsoluteFile.getParent))
 
   def write(epochs: Seq[EpochEntry]): Unit = checkpoint.write(epochs)
 
diff --git a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
index 9cd096369e2..189d12173c5 100644
--- a/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
+++ b/core/src/main/scala/kafka/server/checkpoints/OffsetCheckpointFile.scala
@@ -19,6 +19,7 @@ package kafka.server.checkpoints
 import java.io._
 import java.util.regex.Pattern
 
+import kafka.log.AbsoluteLogDir
 import kafka.server.LogDirFailureChannel
 import kafka.server.epoch.EpochEntry
 import org.apache.kafka.common.TopicPartition
@@ -54,7 +55,7 @@ trait OffsetCheckpoint {
   */
 class OffsetCheckpointFile(val f: File, logDirFailureChannel: LogDirFailureChannel = null) {
   val checkpoint = new CheckpointFile[(TopicPartition, Long)](f, OffsetCheckpointFile.CurrentVersion,
-    OffsetCheckpointFile.Formatter, logDirFailureChannel, f.getParent)
+    OffsetCheckpointFile.Formatter, logDirFailureChannel, AbsoluteLogDir(f.getAbsoluteFile.getParent))
 
   def write(offsets: Map[TopicPartition, Long]): Unit = checkpoint.write(offsets.toSeq)
 
diff --git a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
index f4998f6397b..2b1f4dbe93c 100644
--- a/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
+++ b/core/src/test/scala/unit/kafka/server/checkpoints/OffsetCheckpointFileTest.scala
@@ -16,6 +16,7 @@
   */
 package kafka.server.checkpoints
 
+import kafka.log.AbsoluteLogDir
 import kafka.server.LogDirFailureChannel
 import kafka.utils.{Logging, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -94,7 +95,7 @@ class OffsetCheckpointFileTest extends JUnitSuite with Logging {
     val file = TestUtils.tempFile()
     val logDirFailureChannel = new LogDirFailureChannel(10)
     val checkpointFile = new CheckpointFile(file, OffsetCheckpointFile.CurrentVersion + 1,
-      OffsetCheckpointFile.Formatter, logDirFailureChannel, file.getParent)
+      OffsetCheckpointFile.Formatter, logDirFailureChannel, AbsoluteLogDir(file.getParent))
     checkpointFile.write(Seq(new TopicPartition("foo", 5) -> 10L))
     new OffsetCheckpointFile(checkpointFile.file, logDirFailureChannel).read()
   }
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index ab840eed24c..9e1194483cb 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -230,7 +230,11 @@ object TestUtils extends Logging {
     if (nodeId >= 0) props.put(KafkaConfig.BrokerIdProp, nodeId.toString)
     props.put(KafkaConfig.ListenersProp, listeners)
     if (logDirCount > 1) {
-      val logDirs = (1 to logDirCount).toList.map(i => TestUtils.tempDir().getAbsolutePath).mkString(",")
+      val logDirs = (1 to logDirCount).toList.map(i =>
+        // We would like to allow user to specify both relative path and absolute path as log directory for backward-compatibility reason
+        // We can verify this by using a mixture of relative path and absolute path as log directories in the test
+        if (i%2 == 0) TestUtils.tempDir().getAbsolutePath else TestUtils.tempRelativeDir("data")
+      ).mkString(",")
       props.put(KafkaConfig.LogDirsProp, logDirs)
     } else {
       props.put(KafkaConfig.LogDirProp, TestUtils.tempDir().getAbsolutePath)


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Represent logDir with case class where absolute path is required
> ----------------------------------------------------------------
>
>                 Key: KAFKA-5812
>                 URL: https://issues.apache.org/jira/browse/KAFKA-5812
>             Project: Kafka
>          Issue Type: Improvement
>            Reporter: Dong Lin
>            Assignee: Dong Lin
>            Priority: Minor
>




--
This message was sent by Atlassian JIRA
(v7.6.3#76005)