You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2018/09/09 01:01:07 UTC

[kafka] branch trunk updated: KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)

This is an automated email from the ASF dual-hosted git repository.

jgus pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 958cdca  KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)
958cdca is described below

commit 958cdca9bece8b65ceb204e1c7a14cf44729bb66
Author: Dhruvil Shah <dh...@confluent.io>
AuthorDate: Sat Sep 8 18:01:01 2018 -0700

    KAFKA-7385; Fix log cleaner behavior when only empty batches are retained (#5623)
    
    With idempotent/transactional producers, we may leave empty batches in the log during log compaction. When filtering the data, we keep track of state like `maxOffset` and `maxTimestamp` of filtered data. This patch ensures we maintain this state correctly for the case when only empty batches are left in `MemoryRecords#filterTo`. Without this patch, we did not initialize `maxOffset` in this edge case which led us to append data to the log with `maxOffset` = -1L, causing the append to f [...]
    
    Reviewers: Jason Gustafson <ja...@confluent.io>
---
 .../apache/kafka/common/record/MemoryRecords.java  | 143 +++++++++++--------
 .../clients/consumer/internals/FetcherTest.java    |   4 +-
 .../kafka/common/record/MemoryRecordsTest.java     | 153 +++++++++++++++------
 core/src/main/scala/kafka/log/LogCleaner.scala     |   2 +-
 .../test/scala/unit/kafka/log/LogCleanerTest.scala |  45 ++++--
 5 files changed, 229 insertions(+), 118 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 55a4711..af62e09 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -160,20 +160,14 @@ public class MemoryRecords extends AbstractRecords {
     private static FilterResult filterTo(TopicPartition partition, Iterable<MutableRecordBatch> batches,
                                          RecordFilter filter, ByteBuffer destinationBuffer, int maxRecordBatchSize,
                                          BufferSupplier decompressionBufferSupplier) {
-        long maxTimestamp = RecordBatch.NO_TIMESTAMP;
-        long maxOffset = -1L;
-        long shallowOffsetOfMaxTimestamp = -1L;
-        int messagesRead = 0;
-        int bytesRead = 0; // bytes processed from `batches`
-        int messagesRetained = 0;
-        int bytesRetained = 0;
-
+        FilterResult filterResult = new FilterResult(destinationBuffer);
         ByteBufferOutputStream bufferOutputStream = new ByteBufferOutputStream(destinationBuffer);
 
         for (MutableRecordBatch batch : batches) {
-            bytesRead += batch.sizeInBytes();
-
+            long maxOffset = -1L;
             BatchRetention batchRetention = filter.checkBatchRetention(batch);
+            filterResult.bytesRead += batch.sizeInBytes();
+
             if (batchRetention == BatchRetention.DELETE)
                 continue;
 
@@ -189,7 +183,7 @@ public class MemoryRecords extends AbstractRecords {
             try (final CloseableIterator<Record> iterator = batch.streamingIterator(decompressionBufferSupplier)) {
                 while (iterator.hasNext()) {
                     Record record = iterator.next();
-                    messagesRead += 1;
+                    filterResult.messagesRead += 1;
 
                     if (filter.shouldRetainRecord(batch, record)) {
                         // Check for log corruption due to KAFKA-4298. If we find it, make sure that we overwrite
@@ -210,20 +204,11 @@ public class MemoryRecords extends AbstractRecords {
             if (!retainedRecords.isEmpty()) {
                 if (writeOriginalBatch) {
                     batch.writeTo(bufferOutputStream);
-                    messagesRetained += retainedRecords.size();
-                    bytesRetained += batch.sizeInBytes();
-                    if (batch.maxTimestamp() > maxTimestamp) {
-                        maxTimestamp = batch.maxTimestamp();
-                        shallowOffsetOfMaxTimestamp = batch.lastOffset();
-                    }
+                    filterResult.updateRetainedBatchMetadata(batch, retainedRecords.size(), false);
                 } else {
                     MemoryRecordsBuilder builder = buildRetainedRecordsInto(batch, retainedRecords, bufferOutputStream);
                     MemoryRecords records = builder.build();
                     int filteredBatchSize = records.sizeInBytes();
-
-                    messagesRetained += retainedRecords.size();
-                    bytesRetained += filteredBatchSize;
-
                     if (filteredBatchSize > batch.sizeInBytes() && filteredBatchSize > maxRecordBatchSize)
                         log.warn("Record batch from {} with last offset {} exceeded max record batch size {} after cleaning " +
                                         "(new size is {}). Consumers with version earlier than 0.10.1.0 may need to " +
@@ -231,10 +216,8 @@ public class MemoryRecords extends AbstractRecords {
                                 partition, batch.lastOffset(), maxRecordBatchSize, filteredBatchSize);
 
                     MemoryRecordsBuilder.RecordsInfo info = builder.info();
-                    if (info.maxTimestamp > maxTimestamp) {
-                        maxTimestamp = info.maxTimestamp;
-                        shallowOffsetOfMaxTimestamp = info.shallowOffsetOfMaxTimestamp;
-                    }
+                    filterResult.updateRetainedBatchMetadata(info.maxTimestamp, info.shallowOffsetOfMaxTimestamp,
+                            maxOffset, retainedRecords.size(), filteredBatchSize);
                 }
             } else if (batchRetention == BatchRetention.RETAIN_EMPTY) {
                 if (batchMagic < RecordBatch.MAGIC_VALUE_V2)
@@ -245,18 +228,19 @@ public class MemoryRecords extends AbstractRecords {
                         batch.producerEpoch(), batch.baseSequence(), batch.baseOffset(), batch.lastOffset(),
                         batch.partitionLeaderEpoch(), batch.timestampType(), batch.maxTimestamp(),
                         batch.isTransactional(), batch.isControlBatch());
+                filterResult.updateRetainedBatchMetadata(batch, 0, true);
             }
 
-            // If we had to allocate a new buffer to fit the filtered output (see KAFKA-5316), return early to
+            // If we had to allocate a new buffer to fit the filtered buffer (see KAFKA-5316), return early to
             // avoid the need for additional allocations.
             ByteBuffer outputBuffer = bufferOutputStream.buffer();
-            if (outputBuffer != destinationBuffer)
-                return new FilterResult(outputBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
-                        maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+            if (outputBuffer != destinationBuffer) {
+                filterResult.outputBuffer = outputBuffer;
+                return filterResult;
+            }
         }
 
-        return new FilterResult(destinationBuffer, messagesRead, bytesRead, messagesRetained, bytesRetained,
-                maxOffset, maxTimestamp, shallowOffsetOfMaxTimestamp);
+        return filterResult;
     }
 
     private static MemoryRecordsBuilder buildRetainedRecordsInto(RecordBatch originalBatch,
@@ -369,33 +353,76 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     public static class FilterResult {
-        public final ByteBuffer output;
-        public final int messagesRead;
-        public final int bytesRead;
-        public final int messagesRetained;
-        public final int bytesRetained;
-        public final long maxOffset;
-        public final long maxTimestamp;
-        public final long shallowOffsetOfMaxTimestamp;
-
-        // Note that `bytesRead` should contain only bytes from batches that have been processed,
-        // i.e. bytes from `messagesRead` and any discarded batches.
-        public FilterResult(ByteBuffer output,
-                            int messagesRead,
-                            int bytesRead,
-                            int messagesRetained,
-                            int bytesRetained,
-                            long maxOffset,
-                            long maxTimestamp,
-                            long shallowOffsetOfMaxTimestamp) {
-            this.output = output;
-            this.messagesRead = messagesRead;
-            this.bytesRead = bytesRead;
-            this.messagesRetained = messagesRetained;
-            this.bytesRetained = bytesRetained;
-            this.maxOffset = maxOffset;
-            this.maxTimestamp = maxTimestamp;
-            this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+        private ByteBuffer outputBuffer;
+        private int messagesRead = 0;
+        // Note that `bytesRead` should contain only bytes from batches that have been processed, i.e. bytes from
+        // `messagesRead` and any discarded batches.
+        private int bytesRead = 0;
+        private int messagesRetained = 0;
+        private int bytesRetained = 0;
+        private long maxOffset = -1L;
+        private long maxTimestamp = RecordBatch.NO_TIMESTAMP;
+        private long shallowOffsetOfMaxTimestamp = -1L;
+
+        private FilterResult(ByteBuffer outputBuffer) {
+            this.outputBuffer = outputBuffer;
+        }
+
+        private void updateRetainedBatchMetadata(MutableRecordBatch retainedBatch, int numMessagesInBatch, boolean headerOnly) {
+            int bytesRetained = headerOnly ? DefaultRecordBatch.RECORD_BATCH_OVERHEAD : retainedBatch.sizeInBytes();
+            updateRetainedBatchMetadata(retainedBatch.maxTimestamp(), retainedBatch.lastOffset(),
+                    retainedBatch.lastOffset(), numMessagesInBatch, bytesRetained);
+        }
+
+        private void updateRetainedBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset,
+                                                int messagesRetained, int bytesRetained) {
+            validateBatchMetadata(maxTimestamp, shallowOffsetOfMaxTimestamp, maxOffset);
+            if (maxTimestamp > this.maxTimestamp) {
+                this.maxTimestamp = maxTimestamp;
+                this.shallowOffsetOfMaxTimestamp = shallowOffsetOfMaxTimestamp;
+            }
+            this.maxOffset = Math.max(maxOffset, this.maxOffset);
+            this.messagesRetained += messagesRetained;
+            this.bytesRetained += bytesRetained;
+        }
+
+        private void validateBatchMetadata(long maxTimestamp, long shallowOffsetOfMaxTimestamp, long maxOffset) {
+            if (maxTimestamp != RecordBatch.NO_TIMESTAMP && shallowOffsetOfMaxTimestamp < 0)
+                throw new IllegalArgumentException("shallowOffset undefined for maximum timestamp " + maxTimestamp);
+            if (maxOffset < 0)
+                throw new IllegalArgumentException("maxOffset undefined");
+        }
+
+        public ByteBuffer outputBuffer() {
+            return outputBuffer;
+        }
+
+        public int messagesRead() {
+            return messagesRead;
+        }
+
+        public int bytesRead() {
+            return bytesRead;
+        }
+
+        public int messagesRetained() {
+            return messagesRetained;
+        }
+
+        public int bytesRetained() {
+            return bytesRetained;
+        }
+
+        public long maxOffset() {
+            return maxOffset;
+        }
+
+        public long maxTimestamp() {
+            return maxTimestamp;
+        }
+
+        public long shallowOffsetOfMaxTimestamp() {
+            return shallowOffsetOfMaxTimestamp;
         }
     }
 
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 1a82faa..fd550d61 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -2113,8 +2113,8 @@ public class FetcherTest {
                 return record.key() != null;
             }
         }, ByteBuffer.allocate(1024), Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
-        result.output.flip();
-        MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.output);
+        result.outputBuffer().flip();
+        MemoryRecords compactedRecords = MemoryRecords.readableRecords(result.outputBuffer());
 
         subscriptions.assignFromUser(singleton(tp0));
         subscriptions.seek(tp0, 0);
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 61d8a00..579fb74 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -252,6 +252,7 @@ public class MemoryRecordsTest {
                 long baseOffset = 3L;
                 int baseSequence = 10;
                 int partitionLeaderEpoch = 293;
+                int numRecords = 2;
 
                 MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME,
                         baseOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch, baseSequence, isTransactional,
@@ -259,22 +260,34 @@ public class MemoryRecordsTest {
                 builder.append(11L, "2".getBytes(), "b".getBytes());
                 builder.append(12L, "3".getBytes(), "c".getBytes());
                 builder.close();
+                MemoryRecords records = builder.build();
 
                 ByteBuffer filtered = ByteBuffer.allocate(2048);
-                builder.build().filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
-                    @Override
-                    protected BatchRetention checkBatchRetention(RecordBatch batch) {
-                        // retain all batches
-                        return BatchRetention.RETAIN_EMPTY;
-                    }
-
-                    @Override
-                    protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
-                        // delete the records
-                        return false;
-                    }
-                }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
-
+                MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+                        new MemoryRecords.RecordFilter() {
+                            @Override
+                            protected BatchRetention checkBatchRetention(RecordBatch batch) {
+                                // retain all batches
+                                return BatchRetention.RETAIN_EMPTY;
+                            }
+
+                            @Override
+                            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                                // delete the records
+                                return false;
+                            }
+                        }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+                // Verify filter result
+                assertEquals(numRecords, filterResult.messagesRead());
+                assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+                assertEquals(baseOffset + 1, filterResult.maxOffset());
+                assertEquals(0, filterResult.messagesRetained());
+                assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
+                assertEquals(12, filterResult.maxTimestamp());
+                assertEquals(baseOffset + 1, filterResult.shallowOffsetOfMaxTimestamp());
+
+                // Verify filtered records
                 filtered.flip();
                 MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
 
@@ -295,6 +308,55 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testEmptyBatchRetention() {
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            ByteBuffer buffer = ByteBuffer.allocate(DefaultRecordBatch.RECORD_BATCH_OVERHEAD);
+            long producerId = 23L;
+            short producerEpoch = 5;
+            long baseOffset = 3L;
+            int baseSequence = 10;
+            int partitionLeaderEpoch = 293;
+            long timestamp = System.currentTimeMillis();
+
+            DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
+                    baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME,
+                    timestamp, false, false);
+            buffer.flip();
+
+            ByteBuffer filtered = ByteBuffer.allocate(2048);
+            MemoryRecords records = MemoryRecords.readableRecords(buffer);
+            MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+                    new MemoryRecords.RecordFilter() {
+                        @Override
+                        protected BatchRetention checkBatchRetention(RecordBatch batch) {
+                            // retain all batches
+                            return BatchRetention.RETAIN_EMPTY;
+                        }
+
+                        @Override
+                        protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                            return false;
+                        }
+                    }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+            // Verify filter result
+            assertEquals(0, filterResult.messagesRead());
+            assertEquals(records.sizeInBytes(), filterResult.bytesRead());
+            assertEquals(baseOffset, filterResult.maxOffset());
+            assertEquals(0, filterResult.messagesRetained());
+            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filterResult.bytesRetained());
+            assertEquals(timestamp, filterResult.maxTimestamp());
+            assertEquals(baseOffset, filterResult.shallowOffsetOfMaxTimestamp());
+            assertTrue(filterResult.outputBuffer().position() > 0);
+
+            // Verify filtered records
+            filtered.flip();
+            MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+            assertEquals(DefaultRecordBatch.RECORD_BATCH_OVERHEAD, filteredRecords.sizeInBytes());
+        }
+    }
+
+    @Test
     public void testEmptyBatchDeletion() {
         if (magic >= RecordBatch.MAGIC_VALUE_V2) {
             for (final BatchRetention deleteRetention : Arrays.asList(BatchRetention.DELETE, BatchRetention.DELETE_EMPTY)) {
@@ -304,25 +366,32 @@ public class MemoryRecordsTest {
                 long baseOffset = 3L;
                 int baseSequence = 10;
                 int partitionLeaderEpoch = 293;
+                long timestamp = System.currentTimeMillis();
 
                 DefaultRecordBatch.writeEmptyHeader(buffer, RecordBatch.MAGIC_VALUE_V2, producerId, producerEpoch,
                         baseSequence, baseOffset, baseOffset, partitionLeaderEpoch, TimestampType.CREATE_TIME,
-                        System.currentTimeMillis(), false, false);
+                        timestamp, false, false);
                 buffer.flip();
 
                 ByteBuffer filtered = ByteBuffer.allocate(2048);
-                MemoryRecords.readableRecords(buffer).filterTo(new TopicPartition("foo", 0), new MemoryRecords.RecordFilter() {
-                    @Override
-                    protected BatchRetention checkBatchRetention(RecordBatch batch) {
-                        return deleteRetention;
-                    }
-
-                    @Override
-                    protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
-                        return false;
-                    }
-                }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
-
+                MemoryRecords records = MemoryRecords.readableRecords(buffer);
+                MemoryRecords.FilterResult filterResult = records.filterTo(new TopicPartition("foo", 0),
+                        new MemoryRecords.RecordFilter() {
+                            @Override
+                            protected BatchRetention checkBatchRetention(RecordBatch batch) {
+                                return deleteRetention;
+                            }
+
+                            @Override
+                            protected boolean shouldRetainRecord(RecordBatch recordBatch, Record record) {
+                                return false;
+                            }
+                        }, filtered, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
+
+                // Verify filter result
+                assertEquals(0, filterResult.outputBuffer().position());
+
+                // Verify filtered records
                 filtered.flip();
                 MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
                 assertEquals(0, filteredRecords.sizeInBytes());
@@ -591,15 +660,15 @@ public class MemoryRecordsTest {
 
             MemoryRecords.FilterResult result = MemoryRecords.readableRecords(buffer)
                     .filterTo(new TopicPartition("foo", 0), new RetainNonNullKeysFilter(), output, Integer.MAX_VALUE,
-                              BufferSupplier.NO_CACHING);
+                            BufferSupplier.NO_CACHING);
 
-            buffer.position(buffer.position() + result.bytesRead);
-            result.output.flip();
+            buffer.position(buffer.position() + result.bytesRead());
+            result.outputBuffer().flip();
 
-            if (output != result.output)
+            if (output != result.outputBuffer())
                 assertEquals(0, output.position());
 
-            MemoryRecords filtered = MemoryRecords.readableRecords(result.output);
+            MemoryRecords filtered = MemoryRecords.readableRecords(result.outputBuffer());
             records.addAll(TestUtils.toList(filtered.records()));
         }
 
@@ -623,9 +692,9 @@ public class MemoryRecordsTest {
                 break;
             case RecordBatch.MAGIC_VALUE_V1:
                 assertEquals("[(record=LegacyRecordBatch(offset=0, Record(magic=1, attributes=0, compression=NONE, " +
-                        "crc=97210616, CreateTime=1000000, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, " +
-                        "Record(magic=1, attributes=0, compression=NONE, crc=3535988507, CreateTime=1000001, key=4 bytes, " +
-                        "value=6 bytes)))]",
+                                "crc=97210616, CreateTime=1000000, key=4 bytes, value=6 bytes))), (record=LegacyRecordBatch(offset=1, " +
+                                "Record(magic=1, attributes=0, compression=NONE, crc=3535988507, CreateTime=1000001, key=4 bytes, " +
+                                "value=6 bytes)))]",
                         memoryRecords.toString());
                 break;
             case RecordBatch.MAGIC_VALUE_V2:
@@ -669,16 +738,16 @@ public class MemoryRecordsTest {
 
         filtered.flip();
 
-        assertEquals(7, result.messagesRead);
-        assertEquals(4, result.messagesRetained);
-        assertEquals(buffer.limit(), result.bytesRead);
-        assertEquals(filtered.limit(), result.bytesRetained);
+        assertEquals(7, result.messagesRead());
+        assertEquals(4, result.messagesRetained());
+        assertEquals(buffer.limit(), result.bytesRead());
+        assertEquals(filtered.limit(), result.bytesRetained());
         if (magic > RecordBatch.MAGIC_VALUE_V0) {
-            assertEquals(20L, result.maxTimestamp);
+            assertEquals(20L, result.maxTimestamp());
             if (compression == CompressionType.NONE && magic < RecordBatch.MAGIC_VALUE_V2)
-                assertEquals(4L, result.shallowOffsetOfMaxTimestamp);
+                assertEquals(4L, result.shallowOffsetOfMaxTimestamp());
             else
-                assertEquals(5L, result.shallowOffsetOfMaxTimestamp);
+                assertEquals(5L, result.shallowOffsetOfMaxTimestamp());
         }
 
         MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 91ddbf0..04b284c 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -606,7 +606,7 @@ private[log] class Cleaner(val id: Int,
       position += result.bytesRead
 
       // if any messages are to be retained, write them out
-      val outputBuffer = result.output
+      val outputBuffer = result.outputBuffer
       if (outputBuffer.position() > 0) {
         outputBuffer.flip()
         val retained = MemoryRecords.readableRecords(outputBuffer)
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 0240707..73dfa7e 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -412,42 +412,57 @@ class LogCleanerTest extends JUnitSuite {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
 
     val producerEpoch = 0.toShort
-    val producerId = 1L
-    val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+    val producer1 = appendTransactionalAsLeader(log, 1L, producerEpoch)
+    val producer2 = appendTransactionalAsLeader(log, 2L, producerEpoch)
 
-    appendProducer(Seq(2, 3)) // batch last offset is 1
-    log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+    // [{Producer1: 2, 3}]
+    producer1(Seq(2, 3)) // offsets 0, 1
     log.roll()
 
-    log.appendAsLeader(record(2, 2), leaderEpoch = 0)
-    log.appendAsLeader(record(3, 3), leaderEpoch = 0)
+    // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}]
+    producer2(Seq(2, 3)) // offsets 2, 3
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 4
+    log.roll()
+
+    // [{Producer1: 2, 3}], [{Producer2: 2, 3}, {Producer2: Commit}], [{2}, {3}, {Producer1: Commit}]
+    //  {0, 1},              {2, 3},            {4},                   {5}, {6}, {7} ==> Offsets
+    log.appendAsLeader(record(2, 2), leaderEpoch = 0) // offset 5
+    log.appendAsLeader(record(3, 3), leaderEpoch = 0) // offset 6
+    log.appendAsLeader(commitMarker(1L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 7
     log.roll()
 
     // first time through the records are removed
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
     var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is retained
-    assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained
+    assertEquals(List(4, 5, 6), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
 
     // the empty batch remains if cleaned again because it still holds the last sequence
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3), LogTest.keysInLog(log))
-    assertEquals(List(2, 3, 4), offsetsInLog(log)) // commit marker is still retained
-    assertEquals(List(1, 2, 3, 4), lastOffsetsPerBatchInLog(log)) // empty batch is retained
+    assertEquals(List(4, 5, 6), offsetsInLog(log))
+    assertEquals(List(1, 3, 4, 5, 6), lastOffsetsPerBatchInLog(log))
 
     // append a new record from the producer to allow cleaning of the empty batch
-    appendProducer(Seq(1))
+    // [{Producer1: EmptyBatch}, {Producer2: EmptyBatch}, {Producer2: Commit}, {2}, {3}] [{Producer2: 1}, {Producer2: Commit}]
+    //  {1},                     {3},                     {4},                 {5}, {6},  {8},            {9} ==> Offsets
+    producer2(Seq(1)) // offset 8
+    log.appendAsLeader(commitMarker(2L, producerEpoch), leaderEpoch = 0, isFromClient = false) // offset 9
     log.roll()
 
+    // Expected State: [{Producer1: EmptyBatch}, {Producer2: Commit}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(2, 3, 4, 5), offsetsInLog(log)) // commit marker is still retained
-    assertEquals(List(2, 3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch should be gone
+    assertEquals(List(4, 5, 6, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 4, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
 
+    // Expected State: [{Producer1: EmptyBatch}, {2}, {3}, {Producer2: 1}, {Producer2: Commit}]
     dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = Long.MaxValue)._1
     assertEquals(List(2, 3, 1), LogTest.keysInLog(log))
-    assertEquals(List(3, 4, 5), offsetsInLog(log)) // commit marker is gone
-    assertEquals(List(3, 4, 5), lastOffsetsPerBatchInLog(log)) // empty batch is gone
+    assertEquals(List(5, 6, 8, 9), offsetsInLog(log))
+    assertEquals(List(1, 5, 6, 8, 9), lastOffsetsPerBatchInLog(log))
   }
 
   @Test