You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/01 05:18:48 UTC

git commit: KAFKA-1107 Broker unnecessarily recovers all logs when upgrading from 0.8 to 0.8.1; reviewed by Jay Kreps and Guozhang Wang

Updated Branches:
  refs/heads/trunk a55ec0620 -> ec547737d


KAFKA-1107 Broker unnecessarily recovers all logs when upgrading from 0.8 to 0.8.1; reviewed by Jay Kreps and Guozhang Wang


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec547737
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec547737
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec547737

Branch: refs/heads/trunk
Commit: ec547737de730cddb29200eba62c0165f901447b
Parents: a55ec06
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Oct 31 21:14:17 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Oct 31 21:14:17 2013 -0700

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/Log.scala         | 11 +++++++
 core/src/main/scala/kafka/log/LogManager.scala  |  6 +++-
 .../src/test/scala/unit/kafka/log/LogTest.scala | 30 +++++++++++++++++++-
 3 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0cc402b..a0e1b11 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -165,6 +165,12 @@ class Log(val dir: File,
   
   private def recoverLog() {
     val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
+    val cleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile)
+    val needsRecovery = !cleanShutdownFile.exists()
+    if(!needsRecovery) {
+      this.recoveryPoint = lastOffset
+      return
+    }
     if(lastOffset <= this.recoveryPoint) {
       info("Log '%s' is fully intact, skipping recovery".format(name))
       this.recoveryPoint = lastOffset
@@ -697,6 +703,11 @@ object Log {
   /** A temporary file used when swapping files into the log */
   val SwapFileSuffix = ".swap"
 
+  /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
+    * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
+  /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
+  val CleanShutdownFile = ".kafka_cleanshutdown"
+
   /**
    * Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
    * so that ls sorts the files numerically.

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d489e08..390b759 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -109,8 +109,11 @@ class LogManager(val logDirs: Array[File],
       /* load the logs */
       val subDirs = dir.listFiles()
       if(subDirs != null) {
+        val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
+        if(cleanShutDownFile.exists())
+          info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
         for(dir <- subDirs) {
-          if(dir.isDirectory){
+          if(dir.isDirectory) {
             info("Loading log '" + dir.getName + "'")
             val topicPartition = parseTopicPartitionName(dir.getName)
             val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
@@ -124,6 +127,7 @@ class LogManager(val logDirs: Array[File],
               throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
           }
         }
+        cleanShutDownFile.delete()
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 140317c..0b516f9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -617,5 +617,33 @@ class LogTest extends JUnitSuite {
       assertEquals(recoveryPoint, log.logEndOffset)
     }
   }
-  
+
+  @Test
+  def testCleanShutdownFile() {
+    // append some messages to create some segments
+    val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
+    val set = TestUtils.singleMessageSet("test".getBytes())
+    val parentLogDir = logDir.getParentFile
+    assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
+    val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
+    cleanShutdownFile.createNewFile()
+    assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
+    var recoveryPoint = 50L
+    // create a log and write some messages to it
+    var log = new Log(logDir,
+      config,
+      recoveryPoint = 0L,
+      time.scheduler,
+      time)
+    for(i <- 0 until 100)
+      log.append(set)
+    log.close()
+
+    // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
+    // clean shutdown file exists.
+    recoveryPoint = log.logEndOffset
+    log = new Log(logDir, config, 0L, time.scheduler, time)
+    assertEquals(recoveryPoint, log.logEndOffset)
+    cleanShutdownFile.delete()
+  }
 }