You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/09 01:01:07 UTC
[kafka] branch trunk updated: KAFKA-7385;
Fix log cleaner behavior when only empty batches are retained
(#5623)
This is an automated email from the ASF dual-hosted git repository.
jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 958cdca KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)
958cdca is described below
commit 958cdca9bece8b65ceb204e1c7a14cf44729bb66
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Sat Sep 8 18:01:01 2018 -0700
KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)
With idempotent/transactional producers, we may leave empty batches in the log during log compaction. When filtering the data, we keep track of state like `maxOffset` and `maxTimestamp` of filtered data. This patch ensures we maintain this state correctly for the case when only empty batches are left in `MemoryRecords#filterTo`. Without this patch, we did not initialize `maxOffset` in this edge case which led us to append data to the log with `maxOffset` = -1L, causing the append to f [...]
Reviewers: Jason Gustafson <ja...@confluent.io>
---
.../apache/kafka/common/record/MemoryRecords.java | 143 +++++++++++--------
.../clients/consumer/internals/FetcherTest.java | 4 +-
.../kafka/common/record/MemoryRecordsTest.java | 153 +++++++++++++++------
core/src/main/scala/kafka/log/LogCleaner.scala | 2 +-
.../test/scala/unit/kafka/log/LogCleanerTest.scala | 45 ++++--
5 files changed, 229 insertions(+), 118 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 55a4711..af62e09 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -160,20 +160,14 @@ public class MemoryRecords extends AbstractRecords {
private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
BufferSupplier decompressionBufferSupplier) {
- long maxTimestamp = RecordBatch.NO_TIMESTAMP;
- long maxOffset = -1L;
- long shallowOffsetOfMaxTimestamp = -1L;
- int messagesRead = 0;
- int bytesRead = 0; // bytes processed from `batches`
- int messagesRetained = 0;
- int bytesRetained = 0;
-
+ FilterResult filterResult = new FilterResult(destinationBuffer);
ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
for (MutableRecordBatch batch : batches) {
- bytesRead += batch.sizeInBytes();
-
+ long maxOffset = -1L;
BatchRetention batchRetention = filter.checkBatchRetention(batch);
+ filterResult.bytesRead += batch.sizeInBytes();
+
if (batchRetention == BatchRetention.DELETE)
continue;
@@ -189,7 +183,7 @@ public class MemoryRecords extends AbstractRecords {
try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
while (iterator.hasNext()) {
Record record = iterator.next();
- messagesRead += 1;
+ filterResult.messagesRead += 1;
if (filter.shouldRetainRecord(batch, record)) {
// Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
@@ -210,20 +204,11 @@ public class MemoryRecords extends AbstractRecords {
if (!retainedRecords.isEmpty()) {
if (writeOriginalBatch) {
batch.writeTo(bufferOutputStream);
- messagesRetained += retainedRecords.size();
- bytesRetained += batch.sizeInBytes();
- if (batch.maxTimestamp() > maxTimestamp) {
- maxTimestamp = batch.maxTimestamp();
- shallowOffsetOfMaxTimestamp = batch.lastOffset();
- }
+ filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
} else {
MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
MemoryRecords records = builder.build();
int filteredBatchSize = records.sizeInBytes();
-
- messagesRetained += retainedRecords.size();
- bytesRetained += filteredBatchSize;
-
if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
"(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
@@ -231,10 +216,8 @@ public class MemoryRecords extends AbstractRecords {
partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
MemoryRecordsBuilder.RecordsInfo info = builder.info();
- if (info.maxTimestamp > maxTimestamp) {
- maxTimestamp = info.maxTimestamp;
- shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
- }
+ filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
+ maxOffset, retainedRecords.size(), filteredBatchSize);
}
} else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
@@ -245,18 +228,19 @@ public class MemoryRecords extends AbstractRecords {
batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
batch.isTransactional(), batch.isControlBatch());
+ filterResult.updateRetainedBatchMetadata(batch, 0, true);
}
- // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
+ // If we had to allocate a new buffer to fit the filtered buffer (see KAFKA-5316), return early to
// avoid the need for additional allocations.
ByteBuffer outputBuffer = bufferOutputStream.buffer();
- if (outputBuffer != destinationBuffer)
- return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
- maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+ if (outputBuffer != destinationBuffer) {
+ filterResult.outputBuffer = outputBuffer;
+ return filterResult;
+ }
}
- return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
- maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+ return filterResult;
}
private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
@@ -369,33 +353,76 @@ public class MemoryRecords extends AbstractRecords {
}
public static class FilterResult {
- public final ByteBuffer output;
- public final int messagesRead;
- public final int bytesRead;
- public final int messagesRetained;
- public final int bytesRetained;
- public final long maxOffset;
- public final long maxTimestamp;
- public final long shallowOffsetOfMaxTimestamp;
-
- // Note that `bytesRead` should contain only bytes from batches that have been processed,
- // i.e. bytes from `messagesRead` and any discarded batches.
- public FilterResult(ByteBuffer output,
- int messagesRead,
- int bytesRead,
- int messagesRetained,
- int bytesRetained,
- long maxOffset,
- long maxTimestamp,
- long shallowOffsetOfMaxTimestamp) {
- this.output = output;
- this.messagesRead = messagesRead;
- this.bytesRead = bytesRead;
- this.messagesRetained = messagesRetained;
- this.bytesRetained = bytesRetained;
- this.maxOffset = maxOffset;
- this.maxTimestamp = maxTimestamp;
- this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+ private ByteBuffer outputBuffer;
+ private int messagesRead = 0;
+ // Note that `bytesRead` should contain only bytes from batches that have been processed, i.e. bytes from
+ // `messagesRead` and any discarded batches.
+ private int bytesRead = 0;
+ private int messagesRetained = 0;
+ private int bytesRetained = 0;
+ private long maxOffset = -1L;
+ private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+ private long shallowOffsetOfMaxTimestamp = -1L;
+
+ private FilterResult(ByteBuffer outputBuffer) {
+ this.outputBuffer = outputBuffer;
+ }
+
+ private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) {
+ int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
+ updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(),
+ retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
+ }
+
+ private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
+ int messagesRetained, int bytesRetained) {
+ validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
+ if (maxTimestamp > this.maxTimestamp) {
+ this.maxTimestamp = maxTimestamp;
+ this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+ }
+ this.maxOffset = Math.max(maxOffset, this.maxOffset);
+ this.messagesRetained += messagesRetained;
+ this.bytesRetained += bytesRetained;
+ }
+
+ private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
+ if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
+ throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
+ if (maxOffset < 0)
+ throw new IllegalArgumentException("maxOffset undefined");
+ }
+
+ public ByteBuffer outputBuffer() {
+ return outputBuffer;
+ }
+
+ public int messagesRead() {
+ return messagesRead;
+ }
+
+ public int bytesRead() {
+ return bytesRead;
+ }
+
+ public int messagesRetained() {
+ return messagesRetained;
+ }
+
+ public int bytesRetained() {
+ return bytesRetained;
+ }
+
+ public long maxOffset() {
+ return maxOffset;
+ }
+
+ public long maxTimestamp() {
+ return maxTimestamp;
+ }
+
+ public long shallowOffsetOfMaxTimestamp() {
+ return shallowOffsetOfMaxTimestamp;
}
}
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1a82faa..fd550d61 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2113,8 +2113,8 @@ public class FetcherTest {
return record.key() != null;
}
}, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
- result.output.flip();
- MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.output);
+ result.outputBuffer().flip();
+ MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());
subscriptions.assignFromUser(singleton(tp0));
subscriptions.seek(tp0, 0);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 61d8a00..579fb74 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -252,6 +252,7 @@ public class MemoryRecordsTest {
long baseOffset = 3L;
int baseSequence = 10;
int partitionLeaderEpoch = 293;
+ int numRecords = 2;
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional,
@@ -259,22 +260,34 @@ public class MemoryRecordsTest {
builder.append(11L, "2".getBytes(), "b".getBytes());
builder.append(12L, "3".getBytes(), "c".getBytes());
builder.close();
+ MemoryRecords records = builder.build();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- builder.build().filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
- @Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- // retain all batches
- return BatchRetention.RETAIN_EMPTY;
- }
-
- @Override
- protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
- // delete the records
- return false;
- }
- }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
-
+ MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+ new MemoryRecords.RecordFilter() {
+ @Override
+ protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ // retain all batches
+ return BatchRetention.RETAIN_EMPTY;
+ }
+
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ // delete the records
+ return false;
+ }
+ }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+ // Verify filter result
+ assertEquals(numRecords, filterResult.messagesRead());
+ assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+ assertEquals(baseOffset + 1, filterResult.maxOffset());
+ assertEquals(0, filterResult.messagesRetained());
+ assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
+ assertEquals(12, filterResult.maxTimestamp());
+ assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp());
+
+ // Verify filtered records
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
@@ -295,6 +308,55 @@ public class MemoryRecordsTest {
}
@Test
+ public void testEmptyBatchRetention() {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+ long producerId = 23L;
+ short producerEpoch = 5;
+ long baseOffset = 3L;
+ int baseSequence = 10;
+ int partitionLeaderEpoch = 293;
+ long timestamp = System.currentTimeMillis();
+
+ DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
+ baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME,
+ timestamp, false, false);
+ buffer.flip();
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+ new MemoryRecords.RecordFilter() {
+ @Override
+ protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ // retain all batches
+ return BatchRetention.RETAIN_EMPTY;
+ }
+
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ return false;
+ }
+ }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+ // Verify filter result
+ assertEquals(0, filterResult.messagesRead());
+ assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+ assertEquals(baseOffset, filterResult.maxOffset());
+ assertEquals(0, filterResult.messagesRetained());
+ assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
+ assertEquals(timestamp, filterResult.maxTimestamp());
+ assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
+ assertTrue(filterResult.outputBuffer().position() > 0);
+
+ // Verify filtered records
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+ assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes());
+ }
+ }
+
+ @Test
public void testEmptyBatchDeletion() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
@@ -304,25 +366,32 @@ public class MemoryRecordsTest {
long baseOffset = 3L;
int baseSequence = 10;
int partitionLeaderEpoch = 293;
+ long timestamp = System.currentTimeMillis();
DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME,
- System.currentTimeMillis(), false, false);
+ timestamp, false, false);
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
- MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
- @Override
- protected BatchRetention checkBatchRetention(RecordBatch batch) {
- return deleteRetention;
- }
-
- @Override
- protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
- return false;
- }
- }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
-
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+ new MemoryRecords.RecordFilter() {
+ @Override
+ protected BatchRetention checkBatchRetention(RecordBatch batch) {
+ return deleteRetention;
+ }
+
+ @Override
+ protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+ return false;
+ }
+ }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+ // Verify filter result
+ assertEquals(0, filterResult.outputBuffer().position());
+
+ // Verify filtered records
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
assertEquals(0, filteredRecords.sizeInBytes());
@@ -591,15 +660,15 @@ public class MemoryRecordsTest {
MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
.filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE,
- BufferSupplier.NO_CACHING);
+ BufferSupplier.NO_CACHING);
- buffer.position(buffer.position() + result.bytesRead);
- result.output.flip();
+ buffer.position(buffer.position() + result.bytesRead());
+ result.outputBuffer().flip();
- if (output != result.output)
+ if (output != result.outputBuffer())
assertEquals(0, output.position());
- MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
+ MemoryRecords filtered = MemoryRecords.readableRecords(result.outputBuffer());
records.addAll(TestUtils.toList(filtered.records()));
}
@@ -623,9 +692,9 @@ public class MemoryRecordsTest {
break;
case RecordBatch.MAGIC_VALUE_V1:
assertEquals("[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, " +
- "crc=97210616, CreateTime=1000000, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, " +
- "Record(magic=1, attributes=0, compression=NONE, crc=3535988507, CreateTime=1000001, key=4 bytes, " +
- "value=6 bytes)))]",
+ "crc=97210616, CreateTime=1000000, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, " +
+ "Record(magic=1, attributes=0, compression=NONE, crc=3535988507, CreateTime=1000001, key=4 bytes, " +
+ "value=6 bytes)))]",
memoryRecords.toString());
break;
case RecordBatch.MAGIC_VALUE_V2:
@@ -669,16 +738,16 @@ public class MemoryRecordsTest {
filtered.flip();
- assertEquals(7, result.messagesRead);
- assertEquals(4, result.messagesRetained);
- assertEquals(buffer.limit(), result.bytesRead);
- assertEquals(filtered.limit(), result.bytesRetained);
+ assertEquals(7, result.messagesRead());
+ assertEquals(4, result.messagesRetained());
+ assertEquals(buffer.limit(), result.bytesRead());
+ assertEquals(filtered.limit(), result.bytesRetained());
if (magic > RecordBatch.MAGIC_VALUE_V0) {
- assertEquals(20L, result.maxTimestamp);
+ assertEquals(20L, result.maxTimestamp());
if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
- assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
+ assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
else
- assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
+ assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
}
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 91ddbf0..04b284c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -606,7 +606,7 @@ private[log] class Cleaner(val id: Int,
position += result.bytesRead
// if any messages are to be retained, write them out
- val outputBuffer = result.output
+ val outputBuffer = result.outputBuffer
if (outputBuffer.position() > 0) {
outputBuffer.flip()
val retained = MemoryRecords.readableRecords(outputBuffer)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0240707..73dfa7e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -412,42 +412,57 @@ class LogCleanerTest extends JUnitSuite {
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
val producerEpoch = 0.toShort
- val producerId = 1L
- val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+ val producer1 = appendTransactionalAsLeader(log, 1L, producerEpoch)
+ val producer2 = appendTransactionalAsLeader(log, 2L, producerEpoch)
- appendProducer(Seq(2, 3)) // batch last offset is 1
- log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ // [{Producer1: 2, 3}]
+ producer1(Seq(2, 3)) // offsets 0, 1
log.roll()
- log.appendAsLeader(record(2, 2), leaderEpoch = 0)
- log.appendAsLeader(record(3, 3), leaderEpoch = 0)
+ // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}]
+ producer2(Seq(2, 3)) // offsets 2, 3
+ log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 4
+ log.roll()
+
+ // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}], [{2}, {3}, {Producer1: Commit}]
+ // {0, 1}, {2, 3}, {4}, {5}, {6}, {7} ==> Offsets
+ log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 5
+ log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 6
+ log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
log.roll()
// first time through the records are removed
+ // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
- assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is retained
- assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained
+ assertEquals(List(4, 5, 6), offsetsInLog(log))
+ assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
// the empty batch remains if cleaned again because it still holds the last sequence
+ // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3), LogTest.keysInLog(log))
- assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is still retained
- assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained
+ assertEquals(List(4, 5, 6), offsetsInLog(log))
+ assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
// append a new record from the producer to allow cleaning of the empty batch
- appendProducer(Seq(1))
+ // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}]
+ // {1}, {3}, {4}, {5}, {6}, {8}, {9} ==> Offsets
+ producer2(Seq(1)) // offset 8
+ log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
log.roll()
+ // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
- assertEquals(List(2, 3, 4, 5), offsetsInLog(log)) // commit marker is still retained
- assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch should be gone
+ assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log))
+ assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
+ // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
- assertEquals(List(3, 4, 5), offsetsInLog(log)) // commit marker is gone
- assertEquals(List(3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch is gone
+ assertEquals(List(5, 6, 8, 9), offsetsInLog(log))
+ assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
}
@Test