You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/02/17 02:04:24 UTC
[kafka] branch trunk updated: KAFKA-6568;
Log cleaner should check partition state before removal from
inProgress map (#4580)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 7c09b94 KAFKA-6568; Log cleaner should check partition state before removal from inProgress map (#4580)
7c09b94 is described below
commit 7c09b9410a63a1dda118b872afcbd2fcdcd56b65
Author: Jiangjie (Becket) Qin <be...@gmail.com>
AuthorDate: Fri Feb 16 18:04:21 2018 -0800
KAFKA-6568; Log cleaner should check partition state before removal from inProgress map (#4580)
The log cleaner should not naively remove the partition from in progress map without checking the partition state. This may cause the other thread calling `LogCleanerManager.abortAndPauseCleaning()` to hang indefinitely.
---
.../main/scala/kafka/log/LogCleanerManager.scala | 35 +++++++++--
.../unit/kafka/log/LogCleanerManagerTest.scala | 70 ++++++++++++++++++++++
2 files changed, 100 insertions(+), 5 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index c3d3892..b23107b 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -96,6 +96,23 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
}
}
+ /**
+ * Package private for unit test. Get the cleaning state of the partition.
+ */
+ private[log] def cleaningState(tp: TopicPartition): Option[LogCleaningState] = {
+ inLock(lock) {
+ inProgress.get(tp)
+ }
+ }
+
+ /**
+ * Package private for unit test. Set the cleaning state of the partition.
+ */
+ private[log] def setCleaningState(tp: TopicPartition, state: LogCleaningState): Unit = {
+ inLock(lock) {
+ inProgress.put(tp, state)
+ }
+ }
/**
* Choose the log to clean next and add it to the in-progress set. We recompute this
@@ -290,11 +307,11 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
*/
def doneCleaning(topicPartition: TopicPartition, dataDir: File, endOffset: Long) {
inLock(lock) {
- inProgress(topicPartition) match {
- case LogCleaningInProgress =>
+ inProgress.get(topicPartition) match {
+ case Some(LogCleaningInProgress) =>
updateCheckpoints(dataDir, Option(topicPartition, endOffset))
inProgress.remove(topicPartition)
- case LogCleaningAborted =>
+ case Some(LogCleaningAborted) =>
inProgress.put(topicPartition, LogCleaningPaused)
pausedCleaningCond.signalAll()
case s =>
@@ -305,7 +322,15 @@ private[log] class LogCleanerManager(val logDirs: Seq[File],
def doneDeleting(topicPartition: TopicPartition): Unit = {
inLock(lock) {
- inProgress.remove(topicPartition)
+ inProgress.get(topicPartition) match {
+ case Some(LogCleaningInProgress) =>
+ inProgress.remove(topicPartition)
+ case Some(LogCleaningAborted) =>
+ inProgress.put(topicPartition, LogCleaningPaused)
+ pausedCleaningCond.signalAll()
+ case s =>
+ throw new IllegalStateException(s"In-progress partition $topicPartition cannot be in $s state.")
+ }
}
}
}
@@ -344,7 +369,7 @@ private[log] object LogCleanerManager extends Logging {
offset
}
}
-
+
val compactionLagMs = math.max(log.config.compactionLagMs, 0L)
// find first segment that cannot be cleaned
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index 1146029..42a447a 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -215,6 +215,76 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
assertEquals(4L, cleanableOffsets._2)
}
+ @Test
+ def testDoneCleaning(): Unit = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+ while(log.numberOfSegments < 8)
+ log.appendAsLeader(records(log.logEndOffset.toInt, log.logEndOffset.toInt, time.milliseconds()), leaderEpoch = 0)
+
+ val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+ val tp = new TopicPartition("log", 0)
+ try {
+ cleanerManager.doneCleaning(tp, log.dir, 1)
+ } catch {
+ case _ : IllegalStateException =>
+ case _ : Throwable => fail("Should have thrown IllegalStateException.")
+ }
+
+ try {
+ cleanerManager.setCleaningState(tp, LogCleaningPaused)
+ cleanerManager.doneCleaning(tp, log.dir, 1)
+ } catch {
+ case _ : IllegalStateException =>
+ case _ : Throwable => fail("Should have thrown IllegalStateException.")
+ }
+
+ cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+ cleanerManager.doneCleaning(tp, log.dir, 1)
+ assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+ assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+
+ cleanerManager.setCleaningState(tp, LogCleaningAborted)
+ cleanerManager.doneCleaning(tp, log.dir, 1)
+ assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+ assertTrue(cleanerManager.allCleanerCheckpoints.get(tp).nonEmpty)
+ }
+
+ @Test
+ def testDoneDeleting(): Unit = {
+ val records = TestUtils.singletonRecords("test".getBytes, key="test".getBytes)
+ val log: Log = createLog(records.sizeInBytes * 5, LogConfig.Compact + "," + LogConfig.Delete)
+ val cleanerManager: LogCleanerManager = createCleanerManager(log)
+
+ val tp = new TopicPartition("log", 0)
+
+ try {
+ cleanerManager.doneDeleting(tp)
+ } catch {
+ case _ : IllegalStateException =>
+ case _ : Throwable => fail("Should have thrown IllegalStateException.")
+ }
+
+ try {
+ cleanerManager.setCleaningState(tp, LogCleaningPaused)
+ cleanerManager.doneDeleting(tp)
+ } catch {
+ case _ : IllegalStateException =>
+ case _ : Throwable => fail("Should have thrown IllegalStateException.")
+ }
+
+ cleanerManager.setCleaningState(tp, LogCleaningInProgress)
+ cleanerManager.doneDeleting(tp)
+ assertTrue(cleanerManager.cleaningState(tp).isEmpty)
+
+ cleanerManager.setCleaningState(tp, LogCleaningAborted)
+ cleanerManager.doneDeleting(tp)
+ assertEquals(LogCleaningPaused, cleanerManager.cleaningState(tp).get)
+
+ }
+
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(new TopicPartition("log", 0), log)
--
To stop receiving notification emails like this one, please contact
jgus@apache.org.