You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/04/20 10:45:11 UTC

[kafka] branch trunk updated: KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)

This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 301c6f44d6 KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)
301c6f44d6 is described below

commit 301c6f44d605f7152f25057c599eeb0b01b2aef9
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Wed Apr 20 12:45:03 2022 +0200

    KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)
    
    
    Reviewers: Mickael Maison <mi...@gmail.com>
---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 14 +++++-----
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 32 ++++++++++++----------
 2 files changed, 24 insertions(+), 22 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 89d4686e28..48f4d49b6d 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -64,7 +64,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   import LogCleanerManager._
 
 
-  protected override def loggerName = classOf[LogCleaner].getName
+  protected override def loggerName: String = classOf[LogCleaner].getName
 
   // package-private for testing
   private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
@@ -400,11 +400,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       try {
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
-            debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
+            debug(s"Removing the partition offset data in checkpoint file for '$topicPartition' " +
               s"from ${sourceLogDir.getAbsoluteFile} directory.")
             updateCheckpoints(sourceLogDir, partitionToRemove = Option(topicPartition))
 
-            debug(s"Adding the partition offset data in checkpoint file for '${topicPartition}' " +
+            debug(s"Adding the partition offset data in checkpoint file for '$topicPartition' " +
               s"to ${destLogDir.getAbsoluteFile} directory.")
             updateCheckpoints(destLogDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
           case None =>
@@ -525,15 +525,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
       // Remove deleted partitions
       uncleanablePartitions.values.foreach {
         partitions =>
-          val partitionsToRemove = partitions.filterNot(logs.contains(_)).toList
-          partitionsToRemove.foreach { partitions.remove(_) }
+          val partitionsToRemove = partitions.filterNot(logs.contains).toList
+          partitionsToRemove.foreach { partitions.remove }
       }
 
       // Remove entries with empty partition set.
       val logDirsToRemove = uncleanablePartitions.filter {
         case (_, partitions) => partitions.isEmpty
-      }.map { _._1}.toList
-      logDirsToRemove.foreach { uncleanablePartitions.remove(_) }
+      }.keys.toList
+      logDirsToRemove.foreach { uncleanablePartitions.remove }
     }
   }
 }
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0cdafed127..fdc05c74f8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -36,17 +36,17 @@ import scala.collection.mutable
   */
 class LogCleanerManagerTest extends Logging {
 
-  val tmpDir = TestUtils.tempDir()
-  val tmpDir2 = TestUtils.tempDir()
-  val logDir = TestUtils.randomPartitionLogDir(tmpDir)
-  val logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
+  val tmpDir: File = TestUtils.tempDir()
+  val tmpDir2: File = TestUtils.tempDir()
+  val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+  val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir)
   val topicPartition = new TopicPartition("log", 0)
   val topicPartition2 = new TopicPartition("log2", 0)
   val logProps = new Properties()
   logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
   logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
-  val logConfig = LogConfig(logProps)
+  val logConfig: LogConfig = LogConfig(logProps)
   val time = new MockTime(1400000000000L, 1000L)  // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
   val offset = 999
 
@@ -394,7 +394,7 @@ class LogCleanerManagerTest extends Logging {
     val cleanerManager: LogCleanerManager = createCleanerManager(log)
 
     // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
-    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.getOrElse(topicPartition, 0))
 
     cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
     // expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints
@@ -413,7 +413,7 @@ class LogCleanerManagerTest extends Logging {
 
     // updateCheckpoints should remove the topicPartition data in the logDir
     cleanerManager.updateCheckpoints(logDir, partitionToRemove = Option(topicPartition))
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+    assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
   }
 
   @Test
@@ -431,7 +431,7 @@ class LogCleanerManagerTest extends Logging {
     cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
     // verify the partition data in logDir is gone, and data in logDir2 is still there
     assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+    assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
   }
 
   @Test
@@ -471,7 +471,7 @@ class LogCleanerManagerTest extends Logging {
 
     // force delete the logDir2 from checkpoints, so that the partition data should also be deleted
     cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+    assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
   }
 
   /**
@@ -710,14 +710,16 @@ class LogCleanerManagerTest extends Logging {
     assertThrows(classOf[IllegalStateException], () => cleanerManager.doneCleaning(topicPartition, log.dir, 1))
 
     cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
-    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    val endOffset = 1L
+    cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
     assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
+    assertEquals(Some(endOffset), cleanerManager.allCleanerCheckpoints.get(topicPartition))
 
     cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
-    cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+    cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
     assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
-    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
   }
 
   @Test
@@ -755,7 +757,7 @@ class LogCleanerManagerTest extends Logging {
 
     val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
     assertEquals(None, filthiestLog, "Log should not be selected for cleaning")
-    assertEquals(20L, cleanerCheckpoints.get(tp).get, "Unselected log should have checkpoint offset updated")
+    assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have checkpoint offset updated")
   }
 
   /**
@@ -777,7 +779,7 @@ class LogCleanerManagerTest extends Logging {
 
     val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
     assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be selected")
-    assertEquals(15L, cleanerCheckpoints.get(tp0).get, "Unselected log should have checkpoint offset updated")
+    assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have checkpoint offset updated")
   }
 
   private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {