You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ju...@apache.org on 2018/11/12 17:03:18 UTC

[kafka] branch trunk updated: KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)

This is an automated email from the ASF dual-hosted git repository.

junrao pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 3eaf44b  KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)
3eaf44b is described below

commit 3eaf44ba8ea26a7a820894390e8877d404ddd5a2
Author: huxi <hu...@hotmail.com>
AuthorDate: Tue Nov 13 01:02:44 2018 +0800

    KAFKA-7557: optimize LogManager.truncateFullyAndStartAt() (#5848)
    
    Instead of calling deleteSnapshotsAfterRecoveryPointCheckpoint for allLogs, invoking it only for the logs being truncated.
    
    Reviewers: Ismael Juma <is...@juma.me.uk>, Jun Rao <ju...@gmail.com>
---
 core/src/main/scala/kafka/log/LogManager.scala     | 60 +++++++++++++---------
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 29 +++++++++++
 2 files changed, 66 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index 26bfbe9..508dcd0 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -440,6 +440,8 @@ class LogManager(logDirs: Seq[File],
       CoreUtils.swallow(cleaner.shutdown(), this)
     }
 
+    val localLogsByDir = logsByDir
+
     // close logs in each dir
     for (dir <- liveLogDirs) {
       debug(s"Flushing and closing logs at $dir")
@@ -447,7 +449,7 @@ class LogManager(logDirs: Seq[File],
       val pool = Executors.newFixedThreadPool(numRecoveryThreadsPerDataDir)
       threadPools.append(pool)
 
-      val logsInDir = logsByDir.getOrElse(dir.toString, Map()).values
+      val logsInDir = localLogsByDir.getOrElse(dir.toString, Map()).values
 
       val jobsForDir = logsInDir map { log =>
         CoreUtils.runnable {
@@ -466,7 +468,7 @@ class LogManager(logDirs: Seq[File],
 
         // update the last flush point
         debug(s"Updating recovery points at $dir")
-        checkpointLogRecoveryOffsetsInDir(dir)
+        checkpointRecoveryOffsetsAndCleanSnapshot(dir, localLogsByDir.getOrElse(dir.toString, Map()).values.toSeq)
 
         debug(s"Updating log start offsets at $dir")
         checkpointLogStartOffsetsInDir(dir)
@@ -495,7 +497,7 @@ class LogManager(logDirs: Seq[File],
    * @param isFuture True iff the truncation should be performed on the future log of the specified partitions
    */
   def truncateTo(partitionOffsets: Map[TopicPartition, Long], isFuture: Boolean) {
-    var truncated = false
+    val affectedLogs = ArrayBuffer.empty[Log]
     for ((topicPartition, truncateOffset) <- partitionOffsets) {
       val log = {
         if (isFuture)
@@ -511,7 +513,7 @@ class LogManager(logDirs: Seq[File],
           cleaner.abortAndPauseCleaning(topicPartition)
         try {
           if (log.truncateTo(truncateOffset))
-            truncated = true
+            affectedLogs += log
           if (needToStopCleaner && !isFuture)
             cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset)
         } finally {
@@ -523,8 +525,9 @@ class LogManager(logDirs: Seq[File],
       }
     }
 
-    if (truncated)
-      checkpointLogRecoveryOffsets()
+    for ((dir, logs) <- affectedLogs.groupBy(_.dir.getParentFile)) {
+      checkpointRecoveryOffsetsAndCleanSnapshot(dir, logs)
+    }
   }
 
   /**
@@ -557,7 +560,7 @@ class LogManager(logDirs: Seq[File],
           info(s"Compaction for partition $topicPartition is resumed")
         }
       }
-      checkpointLogRecoveryOffsetsInDir(log.dir.getParentFile)
+      checkpointRecoveryOffsetsAndCleanSnapshot(log.dir.getParentFile, Seq(log))
     }
   }
 
@@ -566,7 +569,11 @@ class LogManager(logDirs: Seq[File],
    * to avoid recovering the whole log on startup.
    */
   def checkpointLogRecoveryOffsets() {
-    liveLogDirs.foreach(checkpointLogRecoveryOffsetsInDir)
+    logsByDir.foreach { case (dir, partitionToLogMap) =>
+      liveLogDirs.find(_.getAbsolutePath.equals(dir)).foreach { f =>
+        checkpointRecoveryOffsetsAndCleanSnapshot(f, partitionToLogMap.values.toSeq)
+      }
+    }
   }
 
   /**
@@ -578,21 +585,29 @@ class LogManager(logDirs: Seq[File],
   }
 
   /**
-   * Make a checkpoint for all logs in provided directory.
-   */
+    * Write the recovery checkpoint file for all logs in provided directory and clean older snapshots for provided logs.
+    *
+    * @param dir the directory in which logs are checkpointed
+    * @param logsToCleanSnapshot logs whose snapshots need to be cleaned
+    */
+  // Only for testing
+  private[log] def checkpointRecoveryOffsetsAndCleanSnapshot(dir: File, logsToCleanSnapshot: Seq[Log]): Unit = {
+    try {
+      checkpointLogRecoveryOffsetsInDir(dir)
+      logsToCleanSnapshot.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
+    } catch {
+      case e: IOException =>
+        logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
+          s"file in directory $dir", e)
+    }
+  }
+
   private def checkpointLogRecoveryOffsetsInDir(dir: File): Unit = {
     for {
       partitionToLog <- logsByDir.get(dir.getAbsolutePath)
       checkpoint <- recoveryPointCheckpoints.get(dir)
     } {
-      try {
-        checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
-        allLogs.foreach(_.deleteSnapshotsAfterRecoveryPointCheckpoint())
-      } catch {
-        case e: IOException =>
-          logDirFailureChannel.maybeAddOfflineLogDir(dir.getAbsolutePath, s"Disk error while writing to recovery point " +
-            s"file in directory $dir", e)
-      }
+      checkpoint.write(partitionToLog.mapValues(_.recoveryPoint))
     }
   }
 
@@ -802,7 +817,7 @@ class LogManager(logDirs: Seq[File],
         // Now that replica in source log directory has been successfully renamed for deletion.
         // Close the log, update checkpoint files, and enqueue this log to be deleted.
         sourceLog.close()
-        checkpointLogRecoveryOffsetsInDir(sourceLog.dir.getParentFile)
+        checkpointRecoveryOffsetsAndCleanSnapshot(sourceLog.dir.getParentFile, ArrayBuffer.empty)
         checkpointLogStartOffsetsInDir(sourceLog.dir.getParentFile)
         addLogToBeDeleted(sourceLog)
       } catch {
@@ -840,7 +855,7 @@ class LogManager(logDirs: Seq[File],
         cleaner.updateCheckpoints(removedLog.dir.getParentFile)
       }
       removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      checkpointLogRecoveryOffsetsInDir(removedLog.dir.getParentFile)
+      checkpointRecoveryOffsetsAndCleanSnapshot(removedLog.dir.getParentFile, ArrayBuffer.empty)
       checkpointLogStartOffsetsInDir(removedLog.dir.getParentFile)
       addLogToBeDeleted(removedLog)
       info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
@@ -929,9 +944,8 @@ class LogManager(logDirs: Seq[File],
    * Map of log dir to logs by topic and partitions in that dir
    */
   private def logsByDir: Map[String, Map[TopicPartition, Log]] = {
-    (this.currentLogs.toList ++ this.futureLogs.toList).groupBy {
-      case (_, log) => log.dir.getParent
-    }.mapValues(_.toMap)
+    (this.currentLogs.toList ++ this.futureLogs.toList).toMap
+      .groupBy { case (_, log) => log.dir.getParent }
   }
 
   // logDir should be an absolute path
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 2fbb875..812dada 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -375,6 +375,35 @@ class LogManagerTest {
     assertFalse("Logs not deleted", logManager.hasLogsToBeDeleted)
   }
 
+  @Test
+  def testCheckpointForOnlyAffectedLogs() {
+    val tps = Seq(
+      new TopicPartition("test-a", 0),
+      new TopicPartition("test-a", 1),
+      new TopicPartition("test-a", 2),
+      new TopicPartition("test-b", 0),
+      new TopicPartition("test-b", 1))
+
+    val allLogs = tps.map(logManager.getOrCreateLog(_, logConfig))
+    allLogs.foreach { log =>
+      for (_ <- 0 until 50)
+        log.appendAsLeader(TestUtils.singletonRecords("test".getBytes), leaderEpoch = 0)
+      log.flush()
+    }
+
+    logManager.checkpointRecoveryOffsetsAndCleanSnapshot(this.logDir, allLogs.filter(_.dir.getName.contains("test-a")))
+
+    val checkpoints = new OffsetCheckpointFile(new File(logDir, LogManager.RecoveryPointCheckpointFile)).read()
+
+    tps.zip(allLogs).foreach { case (tp, log) =>
+      assertEquals("Recovery point should equal checkpoint", checkpoints(tp), log.recoveryPoint)
+      if (tp.topic.equals("test-a")) // should only cleanup old producer snapshots for topic 'test-a'
+        assertEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
+      else
+        assertNotEquals(Some(log.minSnapshotsOffsetToRetain), log.oldestProducerSnapshotOffset)
+    }
+  }
+
   private def readLog(log: Log, offset: Long, maxLength: Int = 1024): FetchDataInfo = {
     log.read(offset, maxLength, maxOffset = None, minOneMessage = true, includeAbortedTxns = false)
   }