You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by gw...@apache.org on 2020/07/31 02:12:56 UTC

[kafka] branch trunk updated: KAFKA-10282; Remove Log metrics immediately when deleting log

This is an automated email from the ASF dual-hosted git repository.

gwenshap pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 36b2733  KAFKA-10282; Remove Log metrics immediately when deleting log
36b2733 is described below

commit 36b273370dd840cc2bb6307f311f8952b886b323
Author: Bob Barrett <bo...@confluent.io>
AuthorDate: Thu Jul 30 19:11:56 2020 -0700

    KAFKA-10282; Remove Log metrics immediately when deleting log
    
    Currently, we remove the Log metrics when asynchronous deletion of the log is triggered. However, we attempt to register the metrics immediately upon log creation. If a Log object is re-created for a partition that is pending deletion (because a topic was quickly re-created or because a partition was moved off and back onto a broker), the registration of the new metrics can happen before the asyncrhonous deletion. In this case, the metrics are removed after the second registration, le [...]
    
    To fix this, this patch changes the log deletion behavior to remove the metrics when the log is first marked for deletion, rather than when the files are deleted. This removes the window in which metrics registration can occur before metrics removal. This is justifiable because the log should be logically deleted when a delete request or partition movement finishes, rather than when the files are actually removed. Tested with unit tests.
    
    Author: Bob Barrett <bo...@confluent.io>
    
    Reviewers: David Jacot, Dhruvil Shah, Vikas Singh, Gwen Shapira
    
    Closes #9054 from bob-barrett/KAFKA-10282
---
 core/src/main/scala/kafka/log/Log.scala            |  1 -
 core/src/main/scala/kafka/log/LogManager.scala     | 92 ++++++++++++----------
 .../test/scala/unit/kafka/log/LogManagerTest.scala | 77 +++++++++++++++++-
 core/src/test/scala/unit/kafka/log/LogTest.scala   | 21 -----
 4 files changed, 126 insertions(+), 65 deletions(-)

diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index a58998e..7dccd5f 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -2051,7 +2051,6 @@ class Log(@volatile private var _dir: File,
     maybeHandleIOException(s"Error while deleting log for $topicPartition in dir ${dir.getParent}") {
       lock synchronized {
         checkIfMemoryMappedBufferClosed()
-        removeLogMetrics()
         producerExpireCheck.cancel(true)
         removeAndDeleteSegments(logSegments, asyncDelete = false)
         leaderEpochCache.foreach(_.clear())
diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala
index e2966b0..80cd279 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -199,27 +199,22 @@ class LogManager(logDirs: Seq[File],
       if (cleaner != null)
         cleaner.handleLogDirFailure(dir)
 
-      val offlineCurrentTopicPartitions = currentLogs.collect {
-        case (tp, log) if log.parentDir == dir => tp
-      }
-      offlineCurrentTopicPartitions.foreach { topicPartition => {
-        val removedLog = currentLogs.remove(topicPartition)
-        if (removedLog != null) {
-          removedLog.closeHandlers()
-          removedLog.removeLogMetrics()
+      def removeOfflineLogs(logs: Pool[TopicPartition, Log]): Iterable[TopicPartition] = {
+        val offlineTopicPartitions: Iterable[TopicPartition] = logs.collect {
+          case (tp, log) if log.parentDir == dir => tp
         }
-      }}
+        offlineTopicPartitions.foreach { topicPartition => {
+          val removedLog = removeLogAndMetrics(logs, topicPartition)
+          removedLog.foreach {
+            log => log.closeHandlers()
+          }
+        }}
 
-      val offlineFutureTopicPartitions = futureLogs.collect {
-        case (tp, log) if log.parentDir == dir => tp
+        offlineTopicPartitions
       }
-      offlineFutureTopicPartitions.foreach { topicPartition => {
-        val removedLog = futureLogs.remove(topicPartition)
-        if (removedLog != null) {
-          removedLog.closeHandlers()
-          removedLog.removeLogMetrics()
-        }
-      }}
+
+      val offlineCurrentTopicPartitions = removeOfflineLogs(currentLogs)
+      val offlineFutureTopicPartitions = removeOfflineLogs(futureLogs)
 
       warn(s"Logs for partitions ${offlineCurrentTopicPartitions.mkString(",")} are offline and " +
            s"logs for future partitions ${offlineFutureTopicPartitions.mkString(",")} are offline due to failure on log directory $dir")
@@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File],
         val logsToCheckpoint = logsInDir(logDir)
         checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
         checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+        sourceLog.removeLogMetrics()
         addLogToBeDeleted(sourceLog)
       } catch {
         case e: KafkaStorageException =>
@@ -957,32 +953,34 @@ class LogManager(logDirs: Seq[File],
     */
   def asyncDelete(topicPartition: TopicPartition,
                   isFuture: Boolean = false,
-                  checkpoint: Boolean = true): Log = {
-    val removedLog: Log = logCreationOrDeletionLock synchronized {
-      if (isFuture)
-        futureLogs.remove(topicPartition)
-      else
-        currentLogs.remove(topicPartition)
+                  checkpoint: Boolean = true): Option[Log] = {
+    val removedLog: Option[Log] = logCreationOrDeletionLock synchronized {
+      removeLogAndMetrics(if (isFuture) futureLogs else currentLogs, topicPartition)
     }
-    if (removedLog != null) {
-      // We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
-      if (cleaner != null && !isFuture) {
-        cleaner.abortCleaning(topicPartition)
-        if (checkpoint)
-          cleaner.updateCheckpoints(removedLog.parentDirFile)
-      }
-      removedLog.renameDir(Log.logDeleteDirName(topicPartition))
-      if (checkpoint) {
-        val logDir = removedLog.parentDirFile
-        val logsToCheckpoint = logsInDir(logDir)
-        checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
-        checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
-      }
-      addLogToBeDeleted(removedLog)
-      info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
-    } else if (offlineLogDirs.nonEmpty) {
-      throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}")
+    removedLog match {
+      case Some(removedLog) =>
+        // We need to wait until there is no more cleaning task on the log to be deleted before actually deleting it.
+        if (cleaner != null && !isFuture) {
+          cleaner.abortCleaning(topicPartition)
+          if (checkpoint)
+            cleaner.updateCheckpoints(removedLog.parentDirFile)
+        }
+        removedLog.renameDir(Log.logDeleteDirName(topicPartition))
+        if (checkpoint) {
+          val logDir = removedLog.parentDirFile
+          val logsToCheckpoint = logsInDir(logDir)
+          checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty)
+          checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+        }
+        addLogToBeDeleted(removedLog)
+        info(s"Log for partition ${removedLog.topicPartition} is renamed to ${removedLog.dir.getAbsolutePath} and is scheduled for deletion")
+
+      case None =>
+        if (offlineLogDirs.nonEmpty) {
+          throw new KafkaStorageException(s"Failed to delete log for ${if (isFuture) "future" else ""} $topicPartition because it may be in one of the offline directories ${offlineLogDirs.mkString(",")}")
+        }
     }
+
     removedLog
   }
 
@@ -1151,6 +1149,16 @@ class LogManager(logDirs: Seq[File],
       }
     }
   }
+
+  private def removeLogAndMetrics(logs: Pool[TopicPartition, Log], tp: TopicPartition): Option[Log] = {
+    val removedLog = logs.remove(tp)
+    if (removedLog != null) {
+      removedLog.removeLogMetrics()
+      Some(removedLog)
+    } else {
+      None
+    }
+  }
 }
 
 object LogManager {
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 05d7d71..767e1be 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -20,6 +20,8 @@ package kafka.log
 import java.io._
 import java.util.{Collections, Properties}
 
+import com.yammer.metrics.core.MetricName
+import kafka.metrics.KafkaYammerMetrics
 import kafka.server.{FetchDataInfo, FetchLogEnd}
 import kafka.server.checkpoints.OffsetCheckpointFile
 import kafka.utils._
@@ -33,6 +35,7 @@ import org.mockito.ArgumentMatchers.any
 import org.mockito.Mockito.{doAnswer, spy}
 
 import scala.collection.mutable
+import scala.jdk.CollectionConverters._
 import scala.util.{Failure, Try}
 
 class LogManagerTest {
@@ -399,7 +402,7 @@ class LogManagerTest {
     val txnIndexName = activeSegment.txnIndex.file.getName
     val indexFilesOnDiskBeforeDelete = activeSegment.log.file.getParentFile.listFiles.filter(_.getName.endsWith("index"))
 
-    val removedLog = logManager.asyncDelete(new TopicPartition(name, 0))
+    val removedLog = logManager.asyncDelete(new TopicPartition(name, 0)).get
     val removedSegment = removedLog.activeSegment
     val indexFilesAfterDelete = Seq(removedSegment.lazyOffsetIndex.file, removedSegment.lazyTimeIndex.file,
       removedSegment.txnIndex.file)
@@ -563,4 +566,76 @@ class LogManagerTest {
     logManager.topicConfigUpdated("test-topic")
     assertTrue(logManager.partitionsInitializing.isEmpty)
   }
+
+  @Test
+  def testMetricsExistWhenLogIsRecreatedBeforeDeletion(): Unit = {
+    val topicName = "metric-test"
+    def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.
+      filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName))
+
+    val tp = new TopicPartition(topicName, 0)
+    val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
+
+    def verifyMetrics(): Unit = {
+      assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
+      logMetrics.foreach { metric =>
+        assertTrue(metric.getMBeanName.contains(metricTag))
+      }
+    }
+
+    // Create the Log and assert that the metrics are present
+    logManager.getOrCreateLog(tp, () => logConfig)
+    verifyMetrics()
+
+    // Trigger the deletion and assert that the metrics have been removed
+    val removedLog = logManager.asyncDelete(tp).get
+    assertTrue(logMetrics.isEmpty)
+
+    // Recreate the Log and assert that the metrics are present
+    logManager.getOrCreateLog(tp, () => logConfig)
+    verifyMetrics()
+
+    // Advance time past the file deletion delay and assert that the removed log has been deleted but the metrics
+    // are still present
+    time.sleep(logConfig.fileDeleteDelayMs + 1)
+    assertTrue(removedLog.logSegments.isEmpty)
+    verifyMetrics()
+  }
+
+  @Test
+  def testMetricsAreRemovedWhenMovingCurrentToFutureLog(): Unit = {
+    val dir1 = TestUtils.tempDir()
+    val dir2 = TestUtils.tempDir()
+    logManager = createLogManager(Seq(dir1, dir2))
+    logManager.startup()
+
+    val topicName = "future-log"
+    def logMetrics: mutable.Set[MetricName] = KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala.
+      filter(metric => metric.getType == "Log" && metric.getScope.contains(topicName))
+
+    val tp = new TopicPartition(topicName, 0)
+    val metricTag = s"topic=${tp.topic},partition=${tp.partition}"
+
+    def verifyMetrics(logCount: Int): Unit = {
+      assertEquals(LogMetricNames.allMetricNames.size * logCount, logMetrics.size)
+      logMetrics.foreach { metric =>
+        assertTrue(metric.getMBeanName.contains(metricTag))
+      }
+    }
+
+    // Create the current and future logs and verify that metrics are present for both current and future logs
+    logManager.maybeUpdatePreferredLogDir(tp, dir1.getAbsolutePath)
+    logManager.getOrCreateLog(tp, () => logConfig)
+    logManager.maybeUpdatePreferredLogDir(tp, dir2.getAbsolutePath)
+    logManager.getOrCreateLog(tp, () => logConfig, isFuture = true)
+    verifyMetrics(2)
+
+    // Replace the current log with the future one and verify that only one set of metrics are present
+    logManager.replaceCurrentWithFutureLog(tp)
+    verifyMetrics(1)
+
+    // Trigger the deletion of the former current directory and verify that one set of metrics is still present
+    time.sleep(logConfig.fileDeleteDelayMs + 1)
+    verifyMetrics(1)
+  }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f89c3be..43061b6 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -4327,27 +4327,6 @@ class LogTest {
     assertEquals(1, log.numberOfSegments)
   }
 
-  @Test
-  def testMetricsRemovedOnLogDeletion(): Unit = {
-    TestUtils.clearYammerMetrics()
-
-    val logConfig = LogTest.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = createLog(logDir, logConfig)
-    val topicPartition = Log.parseTopicPartitionName(logDir)
-    val metricTag = s"topic=${topicPartition.topic},partition=${topicPartition.partition}"
-
-    val logMetrics = metricsKeySet.filter(_.getType == "Log")
-    assertEquals(LogMetricNames.allMetricNames.size, logMetrics.size)
-    logMetrics.foreach { metric =>
-      assertTrue(metric.getMBeanName.contains(metricTag))
-    }
-
-    // Delete the log and validate that corresponding metrics were removed.
-    log.delete()
-    val logMetricsAfterDeletion = metricsKeySet.filter(_.getType == "Log")
-    assertTrue(logMetricsAfterDeletion.isEmpty)
-  }
-
   private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
 
   private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {