You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2021/09/16 17:25:58 UTC

[GitHub] [kafka] junrao commented on a change in pull request #11327: KAFKA-13305: fix NullPointerException in LogCleanerManager "uncleanable-bytes" gauge

junrao commented on a change in pull request #11327:
URL: https://github.com/apache/kafka/pull/11327#discussion_r710316445



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -103,11 +103,14 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
             val lastClean = allCleanerCheckpoints
             val now = Time.SYSTEM.milliseconds
             partitions.iterator.map { tp =>
-              val log = logs.get(tp)
-              val lastCleanOffset = lastClean.get(tp)
-              val offsetsToClean = cleanableOffsets(log, lastCleanOffset, now)
-              val (_, uncleanableBytes) = calculateCleanableBytes(log, offsetsToClean.firstDirtyOffset, offsetsToClean.firstUncleanableDirtyOffset)
-              uncleanableBytes
+              Option(logs.get(tp)).map(

Review comment:
       We could do the following the make the code a bit simpler and more consistent.
   
   ```
    map { log =>
                     val lastCleanOffset = lastClean.get(tp)
                     ...
             }
   
   ```

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -511,6 +514,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
     }
   }
+
+  def maintainUncleanablePartitions(): Unit = {
+    // Remove deleted partitions from uncleanablePartitions
+    inLock(lock) {
+      // Remove non-existing logDir
+      // Note: we don't use retain or filterInPlace method in this function because retain in deprecated in
+      // scala 2.13 while filterInPlace is not available in scala 2.12.
+      uncleanablePartitions.filterNot {
+        case (logDir, _) => logDirs.map(_.getAbsolutePath).contains(logDir)
+      }.keys.foreach {
+        uncleanablePartitions.remove(_)
+      }
+
+      uncleanablePartitions.values.foreach {
+        // Remove deleted partitions
+        partitions => partitions.filterNot(logs.contains(_)).foreach {
+          partitions.remove(_)

Review comment:
       Hmm, does removing from a set has side effect of an ongoing iterator? If so, we want to figure out the items to remove first and do the removal in the end. Ditto for the code above in line 527.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -511,6 +514,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       uncleanablePartitions.get(log.parentDir).exists(partitions => partitions.contains(topicPartition))
     }
   }
+
+  def maintainUncleanablePartitions(): Unit = {
+    // Remove deleted partitions from uncleanablePartitions
+    inLock(lock) {
+      // Remove non-existing logDir
+      // Note: we don't use retain or filterInPlace method in this function because retain in deprecated in

Review comment:
       in deprecated => is deprecated

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerIntegrationTest.scala
##########
@@ -46,7 +44,7 @@ class LogCleanerIntegrationTest extends AbstractLogCleanerIntegrationTest with K
     TestUtils.clearYammerMetrics()
   }
 
-  @Timeout(value = DEFAULT_MAX_WAIT_MS, unit = TimeUnit.MILLISECONDS)
+  @Timeout(90)

Review comment:
       Since we use mock time, why do we need a much longer timeout?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -87,17 +87,23 @@ import scala.util.control.ControlThrowable
  * @param logDirs The directories where offset checkpoints reside
  * @param logs The pool of logs
  * @param time A way to control the passage of time
+ * @param scheduler The thread pool scheduler used for background actions
  */
 class LogCleaner(initialConfig: CleanerConfig,
                  val logDirs: Seq[File],
                  val logs: Pool[TopicPartition, UnifiedLog],
                  val logDirFailureChannel: LogDirFailureChannel,
-                 time: Time = Time.SYSTEM) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
+                 time: Time = Time.SYSTEM,
+                 private[log] val scheduler: Scheduler) extends Logging with KafkaMetricsGroup with BrokerReconfigurable
 {
 
   /* Log cleaner configuration which may be dynamically updated */
   @volatile private var config = initialConfig
 
+  // Visible for testing
+  val housekeepingDelayMs = 30000
+  val housekeepingIntervalMs = 30000

Review comment:
       30 seconds seems quite frequent. Could we do 5 mins?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscribe@kafka.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org