You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/17 02:04:24 UTC

[kafka] branch trunk updated: KAFKA-6568; Log cleaner should check partition state before removal from inProgress map (#4580)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 7c09b94  KAFKA-6568; Log cleaner should check partition state before removal from inProgress map  (#4580)
7c09b94 is described below

commit 7c09b9410a63a1dda118b872afcbd2fcdcd56b65
Author: Jiangjie (Becket) Qin <be...@gmail.com>
AuthorDate: Fri Feb 16 18:04:21 2018 -0800

    KAFKA-6568; Log cleaner should check partition state before removal from inProgress map  (#4580)
    
    The log cleaner should not naively remove the partition from in progress map without checking the partition state. This may cause the other thread calling `LogCleanerManager.abortAndPauseCleaning()` to hang indefinitely.
---
 .../main/scala/kafka/log/LogCleanerManager.scala   | 35 +++++++++--
 .../unit/kafka/log/LogCleanerManagerTest.scala     | 70 ++++++++++++++++++++++
 2 files changed, 100 insertions(+), 5 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index c3d3892..b23107b 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -96,6 +96,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
     }
   }
 
+  /**
+    * Package private for unit test. Get the cleaning state of the partition.
+    */
+  private[log] def cleaningState(tp: TopicPartition): Option[LogCleaningState] = {
+    inLock(lock) {
+      inProgress.get(tp)
+    }
+  }
+
+  /**
+    * Package private for unit test. Set the cleaning state of the partition.
+    */
+  private[log] def setCleaningState(tp: TopicPartition, state: LogCleaningState): Unit = {
+    inLock(lock) {
+      inProgress.put(tp, state)
+    }
+  }
 
    /**
     * Choose the log to clean next and add it to the in-progress set. We recompute this
@@ -290,11 +307,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
    */
   def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: Long) {
     inLock(lock) {
-      inProgress(topicPartition) match {
-        case LogCleaningInProgress =>
+      inProgress.get(topicPartition) match {
+        case Some(LogCleaningInProgress) =>
           updateCheckpoints(dataDir, Option(topicPartition, endOffset))
           inProgress.remove(topicPartition)
-        case LogCleaningAborted =>
+        case Some(LogCleaningAborted) =>
           inProgress.put(topicPartition, LogCleaningPaused)
           pausedCleaningCond.signalAll()
         case s =>
@@ -305,7 +322,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
 
   def doneDeleting(topicPartition: TopicPartition): Unit = {
     inLock(lock) {
-      inProgress.remove(topicPartition)
+      inProgress.get(topicPartition) match {
+        case Some(LogCleaningInProgress) =>
+          inProgress.remove(topicPartition)
+        case Some(LogCleaningAborted) =>
+          inProgress.put(topicPartition, LogCleaningPaused)
+          pausedCleaningCond.signalAll()
+        case s =>
+          throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
+      }
     }
   }
 }
@@ -344,7 +369,7 @@ private[log] object LogCleanerManager extends Logging {
         offset
       }
     }
-    
+
     val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
 
     // find first segment that cannot be cleaned
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 1146029..42a447a 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -215,6 +215,76 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
     assertEquals(4L, cleanableOffsets._2)
   }
 
+  @Test
+  def testDoneCleaning(): Unit = {
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+    val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+    while(log.numberOfSegments < 8)
+      log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val tp = new TopicPartition("log", 0)
+    try {
+      cleanerManager.doneCleaning(tp, log.dir, 1)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    try {
+      cleanerManager.setCleaningState(tp, LogCleaningPaused)
+      cleanerManager.doneCleaning(tp, log.dir, 1)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+    cleanerManager.doneCleaning(tp, log.dir, 1)
+    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+
+    cleanerManager.setCleaningState(tp, LogCleaningAborted)
+    cleanerManager.doneCleaning(tp, log.dir, 1)
+    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+    assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+  }
+
+  @Test
+  def testDoneDeleting(): Unit = {
+    val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+    val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+    val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+    val tp = new TopicPartition("log", 0)
+
+    try {
+      cleanerManager.doneDeleting(tp)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    try {
+      cleanerManager.setCleaningState(tp, LogCleaningPaused)
+      cleanerManager.doneDeleting(tp)
+    } catch {
+      case _ : IllegalStateException =>
+      case _ : Throwable => fail("Should have thrown IllegalStateException.")
+    }
+
+    cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+    cleanerManager.doneDeleting(tp)
+    assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+
+    cleanerManager.setCleaningState(tp, LogCleaningAborted)
+    cleanerManager.doneDeleting(tp)
+    assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+
+  }
+
   private def createCleanerManager(log: Log): LogCleanerManager = {
     val logs = new Pool[TopicPartition, Log]()
     logs.put(new TopicPartition("log", 0), log)

-- 
To stop receiving notification emails like this one, please contact
jgus@apache.org.