You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/11/12 17:03:18 UTC
[kafka] branch trunk updated: KAFKA-7557: optimize
LogManager.truncateFullyAndStartAt() (#5848)
This is an automated email from the ASF dual-hosted git repository.
junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 3eaf44b KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)
3eaf44b is described below
commit 3eaf44ba8ea26a7a820894390e8877d404ddd5a2
Author: huxi <hu...@hotmail.com>
AuthorDate: Tue Nov 13 01:02:44 2018 +0800
KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)
Instead of calling deleteSnapshotsAfterRecoveryPointCheckpoint for allLogs, invoking it only for the logs being truncated.
Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
core/src/main/scala/kafka/log/LogManager.scala | 60 +++++++++++++---------
.../test/scala/unit/kafka/log/LogManagerTest.scala | 29 +++++++++++
2 files changed, 66 insertions(+), 23 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 26bfbe9..508dcd0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File],
CoreUtils.swallow(cleaner.shutdown(), this)
}
+ val localLogsByDir = logsByDir
+
// close logs in each dir
for (dir <- liveLogDirs) {
debug(s"Flushing and closing logs at $dir")
@@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File],
val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
threadPools.append(pool)
- val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
+ val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values
val jobsForDir = logsInDir map { log =>
CoreUtils.runnable {
@@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File],
// update the last flush point
debug(s"Updating recovery points at $dir")
- checkpointLogRecoveryOffsetsInDir(dir)
+ checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq)
debug(s"Updating log start offsets at $dir")
checkpointLogStartOffsetsInDir(dir)
@@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File],
* @param isFuture True iff the truncation should be performed on the future log of the specified partitions
*/
def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) {
- var truncated = false
+ val affectedLogs = ArrayBuffer.empty[Log]
for ((topicPartition, truncateOffset) <- partitionOffsets) {
val log = {
if (isFuture)
@@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File],
cleaner.abortAndPauseCleaning(topicPartition)
try {
if (log.truncateTo(truncateOffset))
- truncated = true
+ affectedLogs += log
if (needToStopCleaner && !isFuture)
cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
} finally {
@@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File],
}
}
- if (truncated)
- checkpointLogRecoveryOffsets()
+ for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) {
+ checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
+ }
}
/**
@@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File],
info(s"Compaction for partition $topicPartition is resumed")
}
}
- checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
+ checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log))
}
}
@@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File],
* to avoid recovering the whole log on startup.
*/
def checkpointLogRecoveryOffsets() {
- liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+ logsByDir.foreach { case (dir, partitionToLogMap) =>
+ liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
+ checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
+ }
+ }
}
/**
@@ -578,21 +585,29 @@ class LogManager(logDirs: Seq[File],
}
/**
- * Make a checkpoint for all logs in provided directory.
- */
+ * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
+ *
+ * @param dir the directory in which logs are checkpointed
+ * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
+ */
+ // Only for testing
+ private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+ try {
+ checkpointLogRecoveryOffsetsInDir(dir)
+ logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
+ } catch {
+ case e: IOException =>
+ logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
+ s"file in directory $dir", e)
+ }
+ }
+
private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
for {
partitionToLog <- logsByDir.get(dir.getAbsolutePath)
checkpoint <- recoveryPointCheckpoints.get(dir)
} {
- try {
- checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
- allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
- } catch {
- case e: IOException =>
- logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
- s"file in directory $dir", e)
- }
+ checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
}
}
@@ -802,7 +817,7 @@ class LogManager(logDirs: Seq[File],
// Now that replica in source log directory has been successfully renamed for deletion.
// Close the log, update checkpoint files, and enqueue this log to be deleted.
sourceLog.close()
- checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
+ checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
addLogToBeDeleted(sourceLog)
} catch {
@@ -840,7 +855,7 @@ class LogManager(logDirs: Seq[File],
cleaner.updateCheckpoints(removedLog.dir.getParentFile)
}
removedLog.renameDir(Log.logDeleteDirName(topicPartition))
- checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
+ checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
addLogToBeDeleted(removedLog)
info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
@@ -929,9 +944,8 @@ class LogManager(logDirs: Seq[File],
* Map of log dir to logs by topic and partitions in that dir
*/
private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
- (this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
- case (_, log) => log.dir.getParent
- }.mapValues(_.toMap)
+ (this.currentLogs.toList ++ this.futureLogs.toList).toMap
+ .groupBy { case (_, log) => log.dir.getParent }
}
// logDir should be an absolute path
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 2fbb875..812dada 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -375,6 +375,35 @@ class LogManagerTest {
assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
}
+ @Test
+ def testCheckpointForOnlyAffectedLogs() {
+ val tps = Seq(
+ new TopicPartition("test-a", 0),
+ new TopicPartition("test-a", 1),
+ new TopicPartition("test-a", 2),
+ new TopicPartition("test-b", 0),
+ new TopicPartition("test-b", 1))
+
+ val allLogs = tps.map(logManager.getOrCreateLog(_, logConfig))
+ allLogs.foreach { log =>
+ for (_ <- 0 until 50)
+ log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0)
+ log.flush()
+ }
+
+ logManager.checkpointRecoveryOffsetsAndCleanSnapshot(this.logDir, allLogs.filter(_.dir.getName.contains("test-a")))
+
+ val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read()
+
+ tps.zip(allLogs).foreach { case (tp, log) =>
+ assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
+ if (tp.topic.equals("test-a")) // should only cleanup old producer snapshots for topic 'test-a'
+ assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
+ else
+ assertNotEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
+ }
+ }
+
private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false)
}