You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2018/06/20 08:24:34 UTC

[kafka] branch trunk updated: KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new fd969be  KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)
fd969be is described below

commit fd969beae7326aed70fd201f299e9ebb0e0b72a9
Author: Dong Lin <li...@gmail.com>
AuthorDate: Wed Jun 20 01:24:26 2018 -0700

    KAFKA-6697: Broker should not die if getCanonicalPath fails (#4752)
    
    A broker with multiple log dirs will die on startup if dir.getCanonicalPath() throws
    IOException for one of the log dirs. We should mark such log directory as offline
    instead and the broker should start if there is a healthy log dir.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 20 ++++++++++++-------
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 23 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index c0ac3b8..3bb5ee6 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -144,10 +144,8 @@ class LogManager(logDirs: Seq[File],
    * </ol>
    */
   private def createAndValidateLogDirs(dirs: Seq[File], initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File] = {
-    if(dirs.map(_.getCanonicalPath).toSet.size < dirs.size)
-      throw new KafkaException("Duplicate log directory found: " + dirs.mkString(", "))
-
     val liveLogDirs = new ConcurrentLinkedQueue[File]()
+    val canonicalPaths = mutable.HashSet.empty[String]
 
     for (dir <- dirs) {
       try {
@@ -155,13 +153,21 @@ class LogManager(logDirs: Seq[File],
           throw new IOException(s"Failed to load ${dir.getAbsolutePath} during broker startup")
 
         if (!dir.exists) {
-          info("Log directory '" + dir.getAbsolutePath + "' not found, creating it.")
+          info(s"Log directory ${dir.getAbsolutePath} not found, creating it.")
           val created = dir.mkdirs()
           if (!created)
-            throw new IOException("Failed to create data directory " + dir.getAbsolutePath)
+            throw new IOException(s"Failed to create data directory ${dir.getAbsolutePath}")
         }
         if (!dir.isDirectory || !dir.canRead)
-          throw new IOException(dir.getAbsolutePath + " is not a readable log directory.")
+          throw new IOException(s"${dir.getAbsolutePath} is not a readable log directory.")
+
+        // getCanonicalPath() throws IOException if a file system query fails or if the path is invalid (e.g. contains
+        // the Nul character). Since there's no easy way to distinguish between the two cases, we treat them the same
+        // and mark the log directory as offline.
+        if (!canonicalPaths.add(dir.getCanonicalPath))
+          throw new KafkaException(s"Duplicate log directory found: ${dirs.mkString(", ")}")
+
+
         liveLogDirs.add(dir)
       } catch {
         case e: IOException =>
@@ -169,7 +175,7 @@ class LogManager(logDirs: Seq[File],
       }
     }
     if (liveLogDirs.isEmpty) {
-      fatal(s"Shutdown broker because none of the specified log dirs from " + dirs.mkString(", ") + " can be created or validated")
+      fatal(s"Shutdown broker because none of the specified log dirs from ${dirs.mkString(", ")} can be created or validated")
       Exit.halt(1)
     }
 
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index d9efc23..3fc6c1c 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -67,6 +67,29 @@ class LogManagerTest {
   @Test
   def testCreateLog() {
     val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig)
+    assertEquals(1, logManager.liveLogDirs.size)
+
+    val logFile = new File(logDir, name + "-0")
+    assertTrue(logFile.exists)
+    log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)
+  }
+
+  /**
+   * Test that getOrCreateLog on a non-existent log creates a new log and that we can append to the new log.
+   * The LogManager is configured with one invalid log directory which should be marked as offline.
+   */
+  @Test
+  def testCreateLogWithInvalidLogDir() {
+    // Configure the log dir with the Nul character as the path, which causes dir.getCanonicalPath() to throw an
+    // IOException. This simulates the scenario where the disk is not properly mounted (which is hard to achieve in
+    // a unit test)
+    val dirs = Seq(logDir, new File("\u0000"))
+
+    logManager.shutdown()
+    logManager = createLogManager(dirs)
+    logManager.startup()
+
+    val log = logManager.getOrCreateLog(new TopicPartition(name, 0), logConfig, isNew = true)
     val logFile = new File(logDir, name + "-0")
     assertTrue(logFile.exists)
     log.appendAsLeader(TestUtils.singletonRecords("test".getBytes()), leaderEpoch = 0)