You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by mi...@apache.org on 2022/04/20 10:45:11 UTC
[kafka] branch trunk updated: KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)
This is an automated email from the ASF dual-hosted git repository.
mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 301c6f44d6 KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)
301c6f44d6 is described below
commit 301c6f44d605f7152f25057c599eeb0b01b2aef9
Author: Divij Vaidya <di...@amazon.com>
AuthorDate: Wed Apr 20 12:45:03 2022 +0200
KAFKA-10095: Add stricter assertion in LogCleanerManagerTest (#12004)
Reviewers: Mickael Maison <mi...@gmail.com>
---
.../main/scala/kafka/log/LogCleanerManager.scala | 14 +++++-----
.../unit/kafka/log/LogCleanerManagerTest.scala | 32 ++++++++++++----------
2 files changed, 24 insertions(+), 22 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 89d4686e28..48f4d49b6d 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -64,7 +64,7 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
import LogCleanerManager._
- protected override def loggerName = classOf[LogCleaner].getName
+ protected override def loggerName: String = classOf[LogCleaner].getName
// package-private for testing
private[log] val offsetCheckpointFile = "cleaner-offset-checkpoint"
@@ -400,11 +400,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
try {
checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
case Some(offset) =>
- debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
+ debug(s"Removing the partition offset data in checkpoint file for '$topicPartition' " +
s"from ${sourceLogDir.getAbsoluteFile} directory.")
updateCheckpoints(sourceLogDir, partitionToRemove = Option(topicPartition))
- debug(s"Adding the partition offset data in checkpoint file for '${topicPartition}' " +
+ debug(s"Adding the partition offset data in checkpoint file for '$topicPartition' " +
s"to ${destLogDir.getAbsoluteFile} directory.")
updateCheckpoints(destLogDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
case None =>
@@ -525,15 +525,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
// Remove deleted partitions
uncleanablePartitions.values.foreach {
partitions =>
- val partitionsToRemove = partitions.filterNot(logs.contains(_)).toList
- partitionsToRemove.foreach { partitions.remove(_) }
+ val partitionsToRemove = partitions.filterNot(logs.contains).toList
+ partitionsToRemove.foreach { partitions.remove }
}
// Remove entries with empty partition set.
val logDirsToRemove = uncleanablePartitions.filter {
case (_, partitions) => partitions.isEmpty
- }.map { _._1}.toList
- logDirsToRemove.foreach { uncleanablePartitions.remove(_) }
+ }.keys.toList
+ logDirsToRemove.foreach { uncleanablePartitions.remove }
}
}
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 0cdafed127..fdc05c74f8 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -36,17 +36,17 @@ import scala.collection.mutable
*/
class LogCleanerManagerTest extends Logging {
- val tmpDir = TestUtils.tempDir()
- val tmpDir2 = TestUtils.tempDir()
- val logDir = TestUtils.randomPartitionLogDir(tmpDir)
- val logDir2 = TestUtils.randomPartitionLogDir(tmpDir)
+ val tmpDir: File = TestUtils.tempDir()
+ val tmpDir2: File = TestUtils.tempDir()
+ val logDir: File = TestUtils.randomPartitionLogDir(tmpDir)
+ val logDir2: File = TestUtils.randomPartitionLogDir(tmpDir)
val topicPartition = new TopicPartition("log", 0)
val topicPartition2 = new TopicPartition("log2", 0)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.SegmentIndexBytesProp, 1024: java.lang.Integer)
logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
- val logConfig = LogConfig(logProps)
+ val logConfig: LogConfig = LogConfig(logProps)
val time = new MockTime(1400000000000L, 1000L) // Tue May 13 16:53:20 UTC 2014 for `currentTimeMs`
val offset = 999
@@ -394,7 +394,7 @@ class LogCleanerManagerTest extends Logging {
val cleanerManager: LogCleanerManager = createCleanerManager(log)
// expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
- assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+ assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.getOrElse(topicPartition, 0))
cleanerManager.updateCheckpoints(logDir, partitionToUpdateOrAdd = Option(topicPartition, offset))
// expect the checkpoint offset is now updated to the expected offset after doing updateCheckpoints
@@ -413,7 +413,7 @@ class LogCleanerManagerTest extends Logging {
// updateCheckpoints should remove the topicPartition data in the logDir
cleanerManager.updateCheckpoints(logDir, partitionToRemove = Option(topicPartition))
- assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+ assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
}
@Test
@@ -431,7 +431,7 @@ class LogCleanerManagerTest extends Logging {
cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
// verify the partition data in logDir is gone, and data in logDir2 is still there
assertEquals(offset, cleanerManager.allCleanerCheckpoints(topicPartition2))
- assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+ assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
}
@Test
@@ -471,7 +471,7 @@ class LogCleanerManagerTest extends Logging {
// force delete the logDir2 from checkpoints, so that the partition data should also be deleted
cleanerManager.handleLogDirFailure(logDir2.getAbsolutePath)
- assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+ assertFalse(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
}
/**
@@ -710,14 +710,16 @@ class LogCleanerManagerTest extends Logging {
assertThrows(classOf[IllegalStateException], () => cleanerManager.doneCleaning(topicPartition, log.dir, 1))
cleanerManager.setCleaningState(topicPartition, LogCleaningInProgress)
- cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+ val endOffset = 1L
+ cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
assertTrue(cleanerManager.cleaningState(topicPartition).isEmpty)
- assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+ assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
+ assertEquals(Some(endOffset), cleanerManager.allCleanerCheckpoints.get(topicPartition))
cleanerManager.setCleaningState(topicPartition, LogCleaningAborted)
- cleanerManager.doneCleaning(topicPartition, log.dir, 1)
+ cleanerManager.doneCleaning(topicPartition, log.dir, endOffset)
assertEquals(LogCleaningPaused(1), cleanerManager.cleaningState(topicPartition).get)
- assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).nonEmpty)
+ assertTrue(cleanerManager.allCleanerCheckpoints.contains(topicPartition))
}
@Test
@@ -755,7 +757,7 @@ class LogCleanerManagerTest extends Logging {
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time)
assertEquals(None, filthiestLog, "Log should not be selected for cleaning")
- assertEquals(20L, cleanerCheckpoints.get(tp).get, "Unselected log should have checkpoint offset updated")
+ assertEquals(20L, cleanerCheckpoints(tp), "Unselected log should have checkpoint offset updated")
}
/**
@@ -777,7 +779,7 @@ class LogCleanerManagerTest extends Logging {
val filthiestLog = cleanerManager.grabFilthiestCompactedLog(time).get
assertEquals(tp1, filthiestLog.topicPartition, "Dirtier log should be selected")
- assertEquals(15L, cleanerCheckpoints.get(tp0).get, "Unselected log should have checkpoint offset updated")
+ assertEquals(15L, cleanerCheckpoints(tp0), "Unselected log should have checkpoint offset updated")
}
private def createCleanerManager(log: UnifiedLog): LogCleanerManager = {