You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2020/01/31 11:44:41 UTC
[kafka] branch 2.4 updated: KAFKA-8764: LogCleanerManager endless
loop while compacting/cleaning (#7932)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.4
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.4 by this push:
new f78878e KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
f78878e is described below
commit f78878e6c1efbd98e9ebeff61979738871a54ffb
Author: Tomislav <to...@gmail.com>
AuthorDate: Tue Jan 14 23:03:44 2020 +0100
KAFKA-8764: LogCleanerManager endless loop while compacting/cleaning (#7932)
This fix makes the LogCleaner tolerant of gaps in the offset sequence. Previously, this could lead to endless loops of cleaning which required manual intervention.
Reviewers: Jun Rao <ju...@gmail.com>, David Arthur <mu...@gmail.com>
---
core/src/main/scala/kafka/log/LogCleaner.scala | 15 +++++++-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 43 ++++++++++++++++++++++
2 files changed, 56 insertions(+), 2 deletions(-)
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index bcb3586..f7babf7 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -36,6 +36,7 @@ import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import scala.collection.JavaConverters._
+import scala.collection.mutable.ListBuffer
import scala.collection.{Iterable, Seq, Set, mutable}
import scala.util.control.ControlThrowable
@@ -878,6 +879,11 @@ private[log] class Cleaner(val id: Int,
stats: CleanerStats): Unit = {
map.clear()
val dirty = log.logSegments(start, end).toBuffer
+ val nextSegmentStartOffsets = new ListBuffer[Long]
+ if (dirty.nonEmpty) {
+ for (nextSegment <- dirty.tail) nextSegmentStartOffsets.append(nextSegment.baseOffset)
+ nextSegmentStartOffsets.append(end)
+ }
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
val transactionMetadata = new CleanedTransactionMetadata
@@ -887,10 +893,10 @@ private[log] class Cleaner(val id: Int,
// Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var full = false
- for (segment <- dirty if !full) {
+ for ( (segment, nextSegmentStartOffset) <- dirty.zip(nextSegmentStartOffsets) if !full) {
checkDone(log.topicPartition)
- full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize,
+ full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, nextSegmentStartOffset, log.config.maxMessageSize,
transactionMetadata, stats)
if (full)
debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
@@ -911,6 +917,7 @@ private[log] class Cleaner(val id: Int,
segment: LogSegment,
map: OffsetMap,
startOffset: Long,
+ nextSegmentStartOffset: Long,
maxLogMessageSize: Int,
transactionMetadata: CleanedTransactionMetadata,
stats: CleanerStats): Boolean = {
@@ -964,6 +971,10 @@ private[log] class Cleaner(val id: Int,
if(position == startPosition)
growBuffersOrFail(segment.log, position, maxLogMessageSize, records)
}
+
+ // In the case of offsets gap, fast forward to latest expected offset in this segment.
+ map.updateLatestOffset(nextSegmentStartOffset - 1L)
+
restoreBuffers()
false
}
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 18d86b6..644822c 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -1561,6 +1561,49 @@ class LogCleanerTest {
assertEquals("The tombstone should be retained.", 1, log.logSegments.head.log.batches.iterator.next().lastOffset)
}
+ /**
+ * Verify that the clean is able to move beyond missing offsets records in dirty log
+ */
+ @Test
+ def testCleaningBeyondMissingOffsets(): Unit = {
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024*1024: java.lang.Integer)
+ logProps.put(LogConfig.CleanupPolicyProp, LogConfig.Compact)
+ val logConfig = LogConfig(logProps)
+ val cleaner = makeCleaner(Int.MaxValue)
+
+ {
+ val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+ writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+ // roll new segment with baseOffset 11, leaving previous with holes in offset range [9,10]
+ log.roll(Some(11L))
+
+ // active segment record
+ log.appendAsFollower(messageWithOffset(1015, 1015, 11L))
+
+ val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+ assertEquals("Cleaning point should pass offset gap", log.activeSegment.baseOffset, nextDirtyOffset)
+ }
+
+
+ {
+ val log = makeLog(dir = TestUtils.randomPartitionLogDir(tmpdir), config = logConfig)
+ writeToLog(log, (0 to 9) zip (0 to 9), (0L to 9L))
+ // roll new segment with baseOffset 15, leaving previous with holes in offset rage [10, 14]
+ log.roll(Some(15L))
+
+ writeToLog(log, (15 to 24) zip (15 to 24), (15L to 24L))
+ // roll new segment with baseOffset 30, leaving previous with holes in offset range [25, 29]
+ log.roll(Some(30L))
+
+ // active segment record
+ log.appendAsFollower(messageWithOffset(1015, 1015, 30L))
+
+ val (nextDirtyOffset, _) = cleaner.clean(LogToClean(log.topicPartition, log, 0L, log.activeSegment.baseOffset, needCompactionNow = true))
+ assertEquals("Cleaning point should pass offset gap in multiple segments", log.activeSegment.baseOffset, nextDirtyOffset)
+ }
+ }
+
private def writeToLog(log: Log, keysAndValues: Iterable[(Int, Int)], offsetSeq: Iterable[Long]): Iterable[Long] = {
for(((key, value), offset) <- keysAndValues.zip(offsetSeq))
yield log.appendAsFollower(messageWithOffset(key, value, offset)).lastOffset