You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ne...@apache.org on 2013/11/01 05:18:48 UTC
git commit: KAFKA-1107 Broker unnecessarily recovers all logs when
upgrading from 0.8 to 0.8.1; reviewed by Jay Kreps and Guozhang Wang
Updated Branches:
refs/heads/trunk a55ec0620 -> ec547737d
KAFKA-1107 Broker unnecessarily recovers all logs when upgrading from 0.8 to 0.8.1; reviewed by Jay Kreps and Guozhang Wang
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/ec547737
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/ec547737
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/ec547737
Branch: refs/heads/trunk
Commit: ec547737de730cddb29200eba62c0165f901447b
Parents: a55ec06
Author: Neha Narkhede <ne...@gmail.com>
Authored: Thu Oct 31 21:14:17 2013 -0700
Committer: Neha Narkhede <ne...@gmail.com>
Committed: Thu Oct 31 21:14:17 2013 -0700
----------------------------------------------------------------------
core/src/main/scala/kafka/log/Log.scala | 11 +++++++
core/src/main/scala/kafka/log/LogManager.scala | 6 +++-
.../src/test/scala/unit/kafka/log/LogTest.scala | 30 +++++++++++++++++++-
3 files changed, 45 insertions(+), 2 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 0cc402b..a0e1b11 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -165,6 +165,12 @@ class Log(val dir: File,
private def recoverLog() {
val lastOffset = try {activeSegment.nextOffset} catch {case _: Throwable => -1L}
+ val cleanShutdownFile = new File(dir.getParentFile, CleanShutdownFile)
+ val needsRecovery = !cleanShutdownFile.exists()
+ if(!needsRecovery) {
+ this.recoveryPoint = lastOffset
+ return
+ }
if(lastOffset <= this.recoveryPoint) {
info("Log '%s' is fully intact, skipping recovery".format(name))
this.recoveryPoint = lastOffset
@@ -697,6 +703,11 @@ object Log {
/** A temporary file used when swapping files into the log */
val SwapFileSuffix = ".swap"
+ /** Clean shutdown file that indicates the broker was cleanly shutdown in 0.8. This is required to maintain backwards compatibility
+ * with 0.8 and avoid unnecessary log recovery when upgrading from 0.8 to 0.8.1 */
+ /** TODO: Get rid of CleanShutdownFile in 0.8.2 */
+ val CleanShutdownFile = ".kafka_cleanshutdown"
+
/**
* Make log segment file name from offset bytes. All this does is pad out the offset number with zeros
* so that ls sorts the files numerically.
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index d489e08..390b759 100644
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -109,8 +109,11 @@ class LogManager(val logDirs: Array[File],
/* load the logs */
val subDirs = dir.listFiles()
if(subDirs != null) {
+ val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
+ if(cleanShutDownFile.exists())
+ info("Found clean shutdown file. Skipping recovery for all logs in data directory '%s'".format(dir.getAbsolutePath))
for(dir <- subDirs) {
- if(dir.isDirectory){
+ if(dir.isDirectory) {
info("Loading log '" + dir.getName + "'")
val topicPartition = parseTopicPartitionName(dir.getName)
val config = topicConfigs.getOrElse(topicPartition.topic, defaultConfig)
@@ -124,6 +127,7 @@ class LogManager(val logDirs: Array[File],
throw new IllegalArgumentException("Duplicate log directories found: %s, %s!".format(log.dir.getAbsolutePath, previous.dir.getAbsolutePath))
}
}
+ cleanShutDownFile.delete()
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/ec547737/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index 140317c..0b516f9 100644
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -617,5 +617,33 @@ class LogTest extends JUnitSuite {
assertEquals(recoveryPoint, log.logEndOffset)
}
}
-
+
+ @Test
+ def testCleanShutdownFile() {
+ // append some messages to create some segments
+ val config = logConfig.copy(indexInterval = 1, maxMessageSize = 64*1024, segmentSize = 1000)
+ val set = TestUtils.singleMessageSet("test".getBytes())
+ val parentLogDir = logDir.getParentFile
+ assertTrue("Data directory %s must exist", parentLogDir.isDirectory)
+ val cleanShutdownFile = new File(parentLogDir, Log.CleanShutdownFile)
+ cleanShutdownFile.createNewFile()
+ assertTrue(".kafka_cleanshutdown must exist", cleanShutdownFile.exists())
+ var recoveryPoint = 50L
+ // create a log and write some messages to it
+ var log = new Log(logDir,
+ config,
+ recoveryPoint = 0L,
+ time.scheduler,
+ time)
+ for(i <- 0 until 100)
+ log.append(set)
+ log.close()
+
+ // check if recovery was attempted. Even if the recovery point is 0L, recovery should not be attempted as the
+ // clean shutdown file exists.
+ recoveryPoint = log.logEndOffset
+ log = new Log(logDir, config, 0L, time.scheduler, time)
+ assertEquals(recoveryPoint, log.logEndOffset)
+ cleanShutdownFile.delete()
+ }
}