You are viewing a plain text version of this content. The canonical link for it is here.
Posted to jira@kafka.apache.org by GitBox <gi...@apache.org> on 2020/08/14 01:46:26 UTC

[GitHub] [kafka] showuon opened a new pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

showuon opened a new pull request #9178:
URL: https://github.com/apache/kafka/pull/9178


   In [KIP-113](https://cwiki.apache.org/confluence/display/KAFKA/KIP-113%3A+Support+replicas+movement+between+log+directories), we support replicas movement between log directories. But while the directory change, we forgot to remove the topicPartition offset data in old directory, which will cause there are more than 1 checkpoint copy stayed in the logs for the altered topicPartition. And it'll let the LogCleaner get stuck due to it's possible to always get the old topicPartition offset data from the old checkpoint file. 
   
   I added one more parameter `topicPartitionToBeRemoved` in `updateCheckpoints()` method. So, if the `update` parameter is `None` (as before), we'll do the remove action to remove the `topicPartitionToBeRemoved` data in dir, otherwise, update the data as before.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486750612



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486751134



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-689247342


   @tombentley @ijuma , could you help review this PR? Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747077



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       OK




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747159



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486751738



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao merged pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
junrao merged pull request #9178:
URL: https://github.com/apache/kafka/pull/9178


   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747592



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`, and other places as well. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486266547



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {

Review comment:
       Updated. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486267744



##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -75,6 +75,17 @@ class CheckpointReadBuffer[T](location: String,
   }
 }
 
+/**
+ * This class interacts with the checkpoint file to read or write [TopicPartition, Offset] entries
+ *
+ * The format in the checkpoint file is like this:
+ *  -----checkpoint file content------
+ *  0                <- OffsetCheckpointFile.currentVersion
+ *  2                <- following entries size
+ *  tp1  par1  1     <- the format is: TOPIC  PARTITION  OFFSET
+ *  tp1  par2  2
+ *  -----checkpoint file end----------
+ */

Review comment:
       Thanks for reminding! I've moved my comments to `OffsetCheckpointFile`. And add some format comments in `LeaderEpochCheckpointFile`. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r485958438



##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1698,8 +1698,12 @@ class ReplicaManager(val config: KafkaConfig,
     Partition.removeMetrics(tp)
   }
 
-  // logDir should be an absolute path
-  // sendZkNotification is needed for unit test
+  /**
+   * The log directory failure handler for the replica
+   *
+   * @param dir                     the absooute path of the log directory

Review comment:
       typo absooute

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer exist

Review comment:
       removing => remove

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
-          val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update
+          val existing = update match {
+            case Some(updatedOffset) =>

Review comment:
       updatedOffset is not being used.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {

Review comment:
       Could we make topicPartitionToBeRemoved as Option[TopicPartition]?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -369,13 +381,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
+  /**
+   * alter the checkpoint directory for the topicPartition, to remove the data in sourceLogDir, and add the data in destLogDir
+   */
   def alterCheckpointDir(topicPartition: TopicPartition, sourceLogDir: File, destLogDir: File): Unit = {
     inLock(lock) {
       try {
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
-            // Remove this partition from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None)
+            debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
+              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")

Review comment:
       typo direcotory

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File],
     numRecoveryThreadsPerDataDir = newSize
   }
 
-  // dir should be an absolute path
+  /**
+   * The log diretory failure handler. It'll remove all the checkpoint files located in the directory

Review comment:
       typo diretory

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -393,13 +413,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
+  /**
+   * Stop the cleaning logs in the provided directory

Review comment:
       Stop the cleaning logs => Stop cleaning logs 

##########
File path: core/src/main/scala/kafka/server/checkpoints/CheckpointFile.scala
##########
@@ -75,6 +75,17 @@ class CheckpointReadBuffer[T](location: String,
   }
 }
 
+/**
+ * This class interacts with the checkpoint file to read or write [TopicPartition, Offset] entries
+ *
+ * The format in the checkpoint file is like this:
+ *  -----checkpoint file content------
+ *  0                <- OffsetCheckpointFile.currentVersion
+ *  2                <- following entries size
+ *  tp1  par1  1     <- the format is: TOPIC  PARTITION  OFFSET
+ *  tp1  par2  2
+ *  -----checkpoint file end----------
+ */

Review comment:
       We now have 2 different formats for checkpoint files, one for OffsetCheckpointFile and another for LeaderEpochCheckpointFile. Perhaps we can add the above comment to the appropriate class.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747077



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       OK

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`, and other places as well. Thanks.

##########
File path: core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your reminding, I found the KIP is wrong. In the description, it said it's "Start offset", but in the table below, it becomes "end offset". I confirmed this is typo. I also updated the KIP as well. Thank you.
   
   
   ![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       OK

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`, and other places as well. Thanks.

##########
File path: core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your reminding, I found the KIP is wrong. In the description, it said it's "Start offset", but in the table below, it becomes "end offset". I confirmed this is typo. I also updated the KIP as well. Thank you.
   
   
   ![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       Good suggestion! Updated.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Removed. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`, and other places as well. Thanks.

##########
File path: core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your reminding, I found the KIP is wrong. In the description, it said it's "Start offset", but in the table below, it becomes "end offset". I confirmed this is typo. I also updated the KIP as well. Thank you.
   
   
   ![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       Nice refactor! Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. I just added an equal sign (<=) to make it more accurate. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486266701



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,24 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or removing topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
-          val existing = checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) } ++ update
+          val existing = update match {
+            case Some(updatedOffset) =>

Review comment:
       Nice catch. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-680857804


   @mingaliu @hachikuji @omkreddy , could you please review this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486749618



##########
File path: core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       You are right. I referenced the KIP-101 to document it. After your reminding, I found the KIP is wrong. In the description, it said it's "Start offset", but in the table below, it becomes "end offset". I confirmed this is typo. I also updated the KIP as well. Thank you.
   
   
   ![image](https://user-images.githubusercontent.com/43372967/92851603-5aef8980-f420-11ea-9704-a54e297c6cc2.png)
   
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486750612



##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Sure. I also removed the 2nd param `update=None`




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] junrao commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
junrao commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486454693



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer exist
+   *
+   * @param dataDir                       The File object to be updated
+   * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
+   * @param topicPartitionToBeRemoved     The TopicPartition to be removed
+   */
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       To be consistent, perhaps change update to partitionToUpdateOrAdd and topicPartitionToBeRemoved to partitionToRemove? 

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {
+                case Some(topicPartion) =>
+                  checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartion
+                case None =>
+                  info(s"Nothing added or removed for ${dataDir.getAbsoluteFile} directory in updateCheckpoints.")

Review comment:
       Do we need to log this?

##########
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##########
@@ -203,16 +203,24 @@ class LogCleaner(initialConfig: CleanerConfig,
   }
 
   /**
-   * Update checkpoint file, removing topics and partitions that no longer exist
+   * Update checkpoint file to remove topics and partitions that no longer exist
    */
-  def updateCheckpoints(dataDir: File): Unit = {
-    cleanerManager.updateCheckpoints(dataDir, update=None)
+  def updateCheckpoints(dataDir: File, topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
+    cleanerManager.updateCheckpoints(dataDir, update=None, topicPartitionToBeRemoved)

Review comment:
       Could we used named param for topicPartitionToBeRemoved?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -354,12 +354,30 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)]): Unit = {
+  /**
+   * Update checkpoint file, or remove topics and partitions that no longer exist

Review comment:
       Perhaps tweaks the comment to "Update checkpoint file, adding or removing partitions if necessary."?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))

Review comment:
       Could we used named param for update? Ditto below.

##########
File path: core/src/main/scala/kafka/server/checkpoints/LeaderEpochCheckpointFile.scala
##########
@@ -52,8 +52,16 @@ object LeaderEpochCheckpointFile {
 }
 
 /**
-  * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
-  */
+ * This class persists a map of (LeaderEpoch => Offsets) to a file (for a certain replica)
+ *
+ * The format in the LeaderEpoch checkpoint file is like this:
+ * -----checkpoint file begin------
+ * 0                <- LeaderEpochCheckpointFile.currentVersion
+ * 2                <- following entries size
+ * 0  1     <- the format is: leader_epoch(int32) end_offset(int64)

Review comment:
       The map stores the first offset in each epoch.

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -355,22 +355,28 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
   }
 
   /**
-   * Update checkpoint file, or removing topics and partitions that no longer exist
+   * Update checkpoint file, or remove topics and partitions that no longer exist
    *
    * @param dataDir                       The File object to be updated
    * @param update                        The [TopicPartition, Long] map data to be updated. pass "none" if doing remove, not add
    * @param topicPartitionToBeRemoved     The TopicPartition to be removed
    */
-  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: TopicPartition = null): Unit = {
+  def updateCheckpoints(dataDir: File, update: Option[(TopicPartition, Long)], topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {
     inLock(lock) {
       val checkpoint = checkpoints(dataDir)
       if (checkpoint != null) {
         try {
           val existing = update match {
             case Some(updatedOffset) =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap ++ update
+              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap + updatedOffset
             case None =>
-              checkpoint.read().filter { case (tp, _) => logs.keys.contains(tp) }.toMap - topicPartitionToBeRemoved
+              topicPartitionToBeRemoved match {

Review comment:
       This method assumes that only one of update and topicPartitionToBeRemoved will be set. Perhaps we could just handle the more general case that both could be set?

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -393,13 +419,21 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
+  /**
+   * Stop cleaning logs in the provided directory
+   *
+   * @param dir     the absolute path of the log dir
+   */
   def handleLogDirFailure(dir: String): Unit = {
     warn(s"Stopping cleaning logs in dir $dir")
     inLock(lock) {
       checkpoints = checkpoints.filter { case (k, _) => k.getAbsolutePath != dir }
     }
   }
 
+  /**
+   * Truncate the checkpoint file for the given partition if its checkpointed offset is larger than the given offset

Review comment:
       Truncate the checkpoint file for the given partition => Truncate the checkpointed offset for the given partition

##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Could we use the named param for update in updateCheckpoints() below and other places to make it clear?

##########
File path: core/src/main/scala/kafka/server/ReplicaManager.scala
##########
@@ -1729,8 +1729,12 @@ class ReplicaManager(val config: KafkaConfig,
     Partition.removeMetrics(tp)
   }
 
-  // logDir should be an absolute path
-  // sendZkNotification is needed for unit test
+  /**
+   * The log directory failure handler for the replica
+   *
+   * @param dir                     the absolute path of the log directory
+   * @param sendZkNotification      check if we need to send notificiation to zookeeper node (needed for unit test)

Review comment:
       typo notificiation

##########
File path: core/src/main/scala/kafka/log/LogManager.scala
##########
@@ -184,7 +184,11 @@ class LogManager(logDirs: Seq[File],
     numRecoveryThreadsPerDataDir = newSize
   }
 
-  // dir should be an absolute path
+  /**
+   * The log directory failure handler. It'll remove all the checkpoint files located in the directory

Review comment:
       It'll remove all the checkpoint files located in the directory  => It will stop log cleaning in that directory.

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       It seems the < here should be > and the > two lines below should be <?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints

Review comment:
       expectedOffset => expected offset

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       Should we handle topicPartitionToBeRemoved or assert it is None?

##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)

Review comment:
       allCleanerCheckpoints.get(topicPartition).get) can just be allCleanerCheckpoints(topicPartition). Ditto below.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486747592



##########
File path: core/src/main/scala/kafka/log/LogCleanerManager.scala
##########
@@ -390,9 +396,9 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
         checkpoints.get(sourceLogDir).flatMap(_.read().get(topicPartition)) match {
           case Some(offset) =>
             debug(s"Removing the partition offset data in checkpoint file for '${topicPartition}' " +
-              s"from ${sourceLogDir.getAbsoluteFile} direcotory.")
+              s"from ${sourceLogDir.getAbsoluteFile} directory.")
             // Remove this partition data from the checkpoint file in the source log directory
-            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = topicPartition)
+            updateCheckpoints(sourceLogDir, None, topicPartitionToBeRemoved = Some(topicPartition))

Review comment:
       Good suggestion. I make the default value of 2nd parameter `partitionToUpdateOrAdd` to be `None`, so here, I can just call with 2 params: `updateCheckpoints(sourceLogDir, topicPartitionToBeRemoved = Some(topicPartition))`. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486751738



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -361,6 +366,93 @@ class LogCleanerManagerTest extends Logging {
     assertEquals("should have 1 logs ready to be deleted", 1, deletableLog3.size)
   }
 
+  @Test
+  def testUpdateCheckpointsShouldAddOffsetToPartition(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // expect the checkpoint offset is not the expectedOffset before doing updateCheckpoints
+    assertNotEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).getOrElse(0))
+
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    // expect the checkpoint offset is now updated to the expectedOffset after doing updateCheckpoints
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+  }
+
+  @Test
+  def testUpdateCheckpointsShouldRemovePartitionData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // updateCheckpoints should remove the topicPartition data in the logDir
+    cleanerManager.updateCheckpoints(logDir, None, topicPartitionToBeRemoved = Some(topicPartition))
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testHandleLogDirFailureShouldRemoveDirAndData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    // write some data into the cleaner-offset-checkpoint file in logDir and logDir2
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    cleanerManager.updateCheckpoints(logDir2, Option(topicPartition2, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+
+    cleanerManager.handleLogDirFailure(logDir.getAbsolutePath)
+    // verify the partition data in logDir is gone, and data in logDir2 is still there
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition2).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(topicPartition).isEmpty)
+  }
+
+  @Test
+  def testMaybeTruncateCheckpointShouldTruncateData(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+    val lowerOffset = 1L
+    val higherOffset = 1000L
+
+    // write some data into the cleaner-offset-checkpoint file in logDir
+    cleanerManager.updateCheckpoints(logDir, Option(topicPartition, offset))
+    assertEquals(offset, cleanerManager.allCleanerCheckpoints.get(topicPartition).get)
+
+    // we should not truncate the checkpoint data for checkpointed offset < the given offset (higherOffset)

Review comment:
       I checked again and I think I was right. The truncate Checkpoint file will happen only when the provided offset smaller than the one the the checkpoint file. So the comment is correct. Thanks.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399


   @junrao , thanks for the good comments. I've updated in this commit: https://github.com/apache/kafka/pull/9178/commits/c7f436292209f1778f9455421f67fd81eea81f21. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690200951


   @junrao , thanks for your comments. I've updated in this commit: https://github.com/apache/kafka/pull/9178/commits/5320318eec5cdb938cafd385f45525472370d359. Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on a change in pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on a change in pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#discussion_r486751046



##########
File path: core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
##########
@@ -55,7 +59,8 @@ class LogCleanerManagerTest extends Logging {
       cleanerCheckpoints.toMap
     }
 
-    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)]): Unit = {
+    override def updateCheckpoints(dataDir: File, update: Option[(TopicPartition,Long)],
+                                   topicPartitionToBeRemoved: Option[TopicPartition] = None): Unit = {

Review comment:
       I assert it. Thanks for reminding.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon edited a comment on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon edited a comment on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-673851321


   @omkreddy @hachikuji @mingaliu , could you review this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-690856399






----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-691365862


   @junrao , No, the failing tests are not related to my change.
   
   **tests/Build/JDK 11**: PASS
   **tests/Build/JDK 15**: DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota failed, which is not related to my change
   **tests/Build/JDK 11**: KafkaConsumerTest.testReturnRecordsDuringRebalance failed, which is not related to my change
   
   Thank you.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] ijuma commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
ijuma commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-689590729


   @junrao maybe you can review this.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



[GitHub] [kafka] showuon commented on pull request #9178: KAFKA-8362: fix the old checkpoint won't be removed after alter log dir

Posted by GitBox <gi...@apache.org>.
showuon commented on pull request #9178:
URL: https://github.com/apache/kafka/pull/9178#issuecomment-673851321


   @omkreddy @hachikuji , could you review this PR? Thanks.


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org