You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/26 08:50:43 UTC

kafka git commit: MINOR: Preserve the base offset of the original record batch in V2

Repository: kafka
Updated Branches:
  refs/heads/trunk 02c0c3b01 -> 374336382


MINOR: Preserve the base offset of the original record batch in V2

The previous code did not handle this correctly if a batch was
compacted more than once.

Also add test case for duplicate check after log cleaning and
improve various comments.

Author: Jason Gustafson <ja...@confluent.io>

Reviewers: Ismael Juma <is...@juma.me.uk>

Closes #3145 from hachikuji/minor-improve-base-sequence-docs


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37433638
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37433638
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37433638

Branch: refs/heads/trunk
Commit: 37433638271718344498d695d5da08db12c24eed
Parents: 02c0c3b
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri May 26 09:41:17 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 26 09:46:09 2017 +0100

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  4 +-
 .../record/AbstractLegacyRecordBatch.java       |  2 +-
 .../kafka/common/record/DefaultRecord.java      |  4 +-
 .../kafka/common/record/DefaultRecordBatch.java |  9 +-
 .../kafka/common/record/MemoryRecords.java      |  9 +-
 .../common/record/MemoryRecordsBuilder.java     |  4 +-
 .../org/apache/kafka/common/record/Record.java  |  2 +-
 .../apache/kafka/common/record/RecordBatch.java | 24 +++--
 .../kafka/common/record/MemoryRecordsTest.java  | 64 +++++++++++++-
 core/src/main/scala/kafka/log/Log.scala         |  5 +-
 .../scala/unit/kafka/log/LogCleanerTest.scala   | 92 ++++++++++++++++----
 11 files changed, 178 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 01bd0e5..cd32850 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1035,9 +1035,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
                         if (containsAbortMarker(currentBatch)) {
                             abortedProducerIds.remove(producerId);
                         } else if (isBatchAborted(currentBatch)) {
-                            log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}",
+                            log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}",
                                     producerId, currentBatch.baseOffset(), partition);
-                            nextFetchOffset = currentBatch.lastOffset() + 1;
+                            nextFetchOffset = currentBatch.nextOffset();
                             continue;
                         }
                     }

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6ce3ba3..e028988 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -186,7 +186,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
     }
 
     @Override
-    public long sequence() {
+    public int sequence() {
         return RecordBatch.NO_SEQUENCE;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 9d0cd7e..05b5bb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -60,7 +60,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
  *  ----------------
  *
  * The offset and timestamp deltas compute the difference relative to the base offset and
- * base timestamp of the log entry that this record is contained in.
+ * base timestamp of the batch that this record is contained in.
  */
 public class DefaultRecord implements Record {
 
@@ -102,7 +102,7 @@ public class DefaultRecord implements Record {
     }
 
     @Override
-    public long sequence() {
+    public int sequence() {
         return sequence;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 13f958d..4e52d61 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -44,7 +44,7 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
  *  Magic => Int8
  *  CRC => Uint32
  *  Attributes => Int16
- *  LastOffsetDelta => Int32
+ *  LastOffsetDelta => Int32 // also serves as LastSequenceDelta
  *  BaseTimestamp => Int64
  *  MaxTimestamp => Int64
  *  ProducerId => Int64
@@ -61,6 +61,13 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
  * computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by
  * the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.
  *
+ * On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence
+ * numbers from the original batch when the log is cleaned. This is required in order to be able to restore the
+ * producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then
+ * after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must
+ * be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying
+ * that the first and last sequence numbers of the incoming batch match the last from that producer).
+ *
  * The current attributes are given below:
  *
  *  -------------------------------------------------------------------------------------------------

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 7391e7e..d3bdee2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -141,13 +141,9 @@ public class MemoryRecords extends AbstractRecords {
 
             byte batchMagic = batch.magic();
             boolean writeOriginalEntry = true;
-            long firstOffset = -1;
             List<Record> retainedRecords = new ArrayList<>();
 
             for (Record record : batch) {
-                if (firstOffset < 0)
-                    firstOffset = record.offset();
-
                 messagesRead += 1;
 
                 if (filter.shouldRetain(batch, record)) {
@@ -178,8 +174,11 @@ public class MemoryRecords extends AbstractRecords {
                 ByteBuffer slice = destinationBuffer.slice();
                 TimestampType timestampType = batch.timestampType();
                 long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+                long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ?
+                        batch.baseOffset() : retainedRecords.get(0).offset();
+
                 MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
-                        firstOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
+                        baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
                         batch.isTransactional(), batch.partitionLeaderEpoch());
 
                 for (Record record : retainedRecords)

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index bc25d75..e055aa5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -213,7 +213,7 @@ public class MemoryRecordsBuilder {
         }
     }
 
-    public void setProducerState(long producerId, short epoch, int baseSequence) {
+    public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
         if (isClosed()) {
             // Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
             // If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
@@ -222,7 +222,7 @@ public class MemoryRecordsBuilder {
             throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
         }
         this.producerId = producerId;
-        this.producerEpoch = epoch;
+        this.producerEpoch = producerEpoch;
         this.baseSequence = baseSequence;
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 6de28c3..ab52bef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -38,7 +38,7 @@ public interface Record {
      * Get the sequence number assigned by the producer.
      * @return the sequence number
      */
-    long sequence();
+    int sequence();
 
     /**
      * Get the size in bytes of this record.

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 42b0c2e..db75105 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -96,10 +96,11 @@ public interface RecordBatch extends Iterable<Record> {
     TimestampType timestampType();
 
     /**
-     * Get the first offset contained in this record batch. For magic version prior to 2, this generally
-     * requires deep iteration and will return the offset of the first record in the record batch. For
-     * magic version 2 and above, this will return the first offset of the original record batch (i.e.
-     * prior to compaction). For non-compacted topics, the behavior is equivalent.
+     * Get the base offset contained in this record batch. For magic version prior to 2, the base offset will
+     * always be the offset of the first message in the batch. This generally requires deep iteration and will
+     * return the offset of the first record in the record batch. For magic version 2 and above, this will return
+     * the first offset of the original record batch (i.e. prior to compaction). For non-compacted topics, the
+     * behavior is equivalent.
      *
      * Because this requires deep iteration for older magic versions, this method should be used with
      * caution. Generally {@link #lastOffset()} is safer since access is efficient for all magic versions.
@@ -110,8 +111,9 @@ public interface RecordBatch extends Iterable<Record> {
     long baseOffset();
 
     /**
-     * Get the last offset in this record batch (inclusive). Unlike {@link #baseOffset()}, the last offset
-     * always reflects the offset of the last record in the batch, even after compaction.
+     * Get the last offset in this record batch (inclusive). Just like {@link #baseOffset()}, the last offset
+     * always reflects the offset of the last record in the original batch, even if it is removed during log
+     * compaction.
      *
      * @return The offset of the last record in this batch
      */
@@ -132,7 +134,7 @@ public interface RecordBatch extends Iterable<Record> {
     byte magic();
 
     /**
-     * Get the producer id for this log record batch. For older magic versions, this will return 0.
+     * Get the producer id for this log record batch. For older magic versions, this will return -1.
      *
      * @return The producer id or -1 if there is none
      */
@@ -151,13 +153,17 @@ public interface RecordBatch extends Iterable<Record> {
     boolean hasProducerId();
 
     /**
-     * Get the first sequence number of this record batch.
+     * Get the base sequence number of this record batch. Like {@link #baseOffset()}, this value is not
+     * affected by compaction: it always retains the base sequence number from the original batch.
+     *
      * @return The first sequence number or -1 if there is none
      */
     int baseSequence();
 
     /**
-     * Get the last sequence number of this record batch.
+     * Get the last sequence number of this record batch. Like {@link #lastOffset()}, the last sequence number
+     * always reflects the sequence number of the last record in the original batch, even if it is removed during log
+     * compaction.
      *
      * @return The last sequence number or -1 if there is none
      */

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 5a34f0f..afd0126 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -302,6 +302,50 @@ public class MemoryRecordsTest {
     }
 
     @Test
+    public void testFilterToAlreadyCompactedLog() {
+        ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+        // create a batch with some offset gaps to simulate a compacted batch
+        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
+                TimestampType.CREATE_TIME, 0L);
+        builder.appendWithOffset(5L, 10L, null, "a".getBytes());
+        builder.appendWithOffset(8L, 11L, "1".getBytes(), "b".getBytes());
+        builder.appendWithOffset(10L, 12L, null, "c".getBytes());
+
+        builder.close();
+        buffer.flip();
+
+        ByteBuffer filtered = ByteBuffer.allocate(2048);
+        MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+        filtered.flip();
+        MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+        List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+        assertEquals(1, batches.size());
+
+        MutableRecordBatch batch = batches.get(0);
+        List<Record> records = TestUtils.toList(batch);
+        assertEquals(1, records.size());
+        assertEquals(8L, records.get(0).offset());
+
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V1)
+            assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(records.get(0)));
+        else
+            assertEquals(new SimpleRecord(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "b".getBytes()),
+                    new SimpleRecord(records.get(0)));
+
+        if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+            // the new format preserves first and last offsets from the original batch
+            assertEquals(0L, batch.baseOffset());
+            assertEquals(10L, batch.lastOffset());
+        } else {
+            assertEquals(8L, batch.baseOffset());
+            assertEquals(8L, batch.lastOffset());
+        }
+    }
+
+    @Test
     public void testFilterToPreservesProducerInfo() {
         if (magic >= RecordBatch.MAGIC_VALUE_V2) {
             ByteBuffer buffer = ByteBuffer.allocate(2048);
@@ -332,8 +376,8 @@ public class MemoryRecordsTest {
             builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
                     RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
             builder.append(16L, "6".getBytes(), "g".getBytes());
-            builder.append(17L, null, "h".getBytes());
-            builder.append(18L, "8".getBytes(), "i".getBytes());
+            builder.append(17L, "7".getBytes(), "h".getBytes());
+            builder.append(18L, null, "i".getBytes());
             builder.close();
 
             buffer.flip();
@@ -356,6 +400,10 @@ public class MemoryRecordsTest {
             assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence());
             assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence());
             assertFalse(firstBatch.isTransactional());
+            List<Record> firstBatchRecords = TestUtils.toList(firstBatch);
+            assertEquals(1, firstBatchRecords.size());
+            assertEquals(RecordBatch.NO_SEQUENCE, firstBatchRecords.get(0).sequence());
+            assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(firstBatchRecords.get(0)));
 
             MutableRecordBatch secondBatch = batches.get(1);
             assertEquals(2, secondBatch.countOrNull().intValue());
@@ -366,6 +414,12 @@ public class MemoryRecordsTest {
             assertEquals(baseSequence1, secondBatch.baseSequence());
             assertEquals(baseSequence1 + 2, secondBatch.lastSequence());
             assertFalse(secondBatch.isTransactional());
+            List<Record> secondBatchRecords = TestUtils.toList(secondBatch);
+            assertEquals(2, secondBatchRecords.size());
+            assertEquals(baseSequence1 + 1, secondBatchRecords.get(0).sequence());
+            assertEquals(new SimpleRecord(14L, "4".getBytes(), "e".getBytes()), new SimpleRecord(secondBatchRecords.get(0)));
+            assertEquals(baseSequence1 + 2, secondBatchRecords.get(1).sequence());
+            assertEquals(new SimpleRecord(15L, "5".getBytes(), "f".getBytes()), new SimpleRecord(secondBatchRecords.get(1)));
 
             MutableRecordBatch thirdBatch = batches.get(2);
             assertEquals(2, thirdBatch.countOrNull().intValue());
@@ -376,6 +430,12 @@ public class MemoryRecordsTest {
             assertEquals(baseSequence2, thirdBatch.baseSequence());
             assertEquals(baseSequence2 + 2, thirdBatch.lastSequence());
             assertTrue(thirdBatch.isTransactional());
+            List<Record> thirdBatchRecords = TestUtils.toList(thirdBatch);
+            assertEquals(2, thirdBatchRecords.size());
+            assertEquals(baseSequence2, thirdBatchRecords.get(0).sequence());
+            assertEquals(new SimpleRecord(16L, "6".getBytes(), "g".getBytes()), new SimpleRecord(thirdBatchRecords.get(0)));
+            assertEquals(baseSequence2 + 1, thirdBatchRecords.get(1).sequence());
+            assertEquals(new SimpleRecord(17L, "7".getBytes(), "h".getBytes()), new SimpleRecord(thirdBatchRecords.get(1)));
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 55eb46a..67b9271 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -56,7 +56,8 @@ object LogAppendInfo {
 /**
  * Struct to hold various quantities we compute about each message set before appending to the log
  *
- * @param firstOffset The first offset in the message set
+ * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
+ *                    to the follower. In that case, this will be the last offset for performance reasons.
  * @param lastOffset The last offset in the message set
  * @param maxTimestamp The maximum timestamp of the message set.
  * @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
@@ -750,6 +751,8 @@ class Log(@volatile var dir: File,
       // update the first offset if on the first message. For magic versions older than 2, we use the last offset
       // to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
       // For magic version 2, we can get the first offset directly from the batch header.
+      // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
+      // case, validation will be more lenient.
       if (firstOffset < 0)
         firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 6eb65ca..a280679 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
 
 import java.io.File
 import java.nio._
-import java.nio.file.Paths
+import java.nio.file.{Files, Paths}
 import java.util.Properties
 
 import kafka.common._
@@ -39,7 +39,7 @@ import scala.collection._
  * Unit tests for the log cleaning logic
  */
 class LogCleanerTest extends JUnitSuite {
-  
+
   val tmpdir = TestUtils.tempDir()
   val dir = TestUtils.randomPartitionLogDir(tmpdir)
   val logProps = new Properties()
@@ -50,12 +50,12 @@ class LogCleanerTest extends JUnitSuite {
   val logConfig = LogConfig(logProps)
   val time = new MockTime()
   val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
-  
+
   @After
   def teardown(): Unit = {
     Utils.delete(tmpdir)
   }
-  
+
   /**
    * Test simple log cleaning
    */
@@ -89,6 +89,66 @@ class LogCleanerTest extends JUnitSuite {
   }
 
   @Test
+  def testDuplicateCheckAfterCleaning(): Unit = {
+    val cleaner = makeCleaner(Int.MaxValue)
+    val logProps = new Properties()
+    logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
+    var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+    val producerEpoch = 0.toShort
+    val pid1 = 1
+    val pid2 = 2
+    val pid3 = 3
+    val pid4 = 4
+
+    appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+    appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
+    appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
+
+    log.roll()
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(2, 3, 3, 4, 1, 4), keysInLog(log))
+    assertEquals(List(1, 2, 3, 5, 6, 7), offsetsInLog(log))
+
+    // we have to reload the log to validate that the cleaner maintained sequence numbers correctly
+    def reloadLog(): Unit = {
+      log.close()
+      log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps), recoveryPoint = 0L)
+    }
+
+    reloadLog()
+
+    // check duplicate append from producer 1
+    var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(2L, logAppendInfo.lastOffset)
+
+    // check duplicate append from producer 3
+    logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
+    assertEquals(6L, logAppendInfo.firstOffset)
+    assertEquals(7L, logAppendInfo.lastOffset)
+
+    // check duplicate append from producer 2
+    logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
+    assertEquals(3L, logAppendInfo.firstOffset)
+    assertEquals(5L, logAppendInfo.lastOffset)
+
+    // do one more append and a round of cleaning to force another deletion from producer 1's batch
+    appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2))
+    log.roll()
+    cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+    assertEquals(List(3, 3, 4, 1, 4, 2), keysInLog(log))
+    assertEquals(List(2, 3, 5, 6, 7, 8), offsetsInLog(log))
+
+    reloadLog()
+
+    // duplicate append from producer1 should still be fine
+    logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+    assertEquals(0L, logAppendInfo.firstOffset)
+    assertEquals(2L, logAppendInfo.lastOffset)
+  }
+
+  @Test
   def testBasicTransactionAwareCleaning(): Unit = {
     val cleaner = makeCleaner(Int.MaxValue)
     val logProps = new Properties()
@@ -974,8 +1034,8 @@ class LogCleanerTest extends JUnitSuite {
   private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
     messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
 
-  private def makeLog(dir: File = dir, config: LogConfig = logConfig) =
-    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+  private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) =
+    new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler,
       time = time, brokerTopicStats = new BrokerTopicStats)
 
   private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */  }
@@ -1006,23 +1066,25 @@ class LogCleanerTest extends JUnitSuite {
       partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
   }
 
-  private def transactionalRecords(records: Seq[SimpleRecord],
-                           producerId: Long,
-                           producerEpoch: Short,
-                           sequence: Int): MemoryRecords = {
-    MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, records: _*)
+  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = {
+    appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
   }
 
-  private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => Unit = {
+  private def appendIdempotentAsLeader(log: Log, producerId: Long,
+                                       producerEpoch: Short = 0,
+                                       isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = {
     var sequence = 0
     keys: Seq[Int] => {
       val simpleRecords = keys.map { key =>
         val keyBytes = key.toString.getBytes
-        new SimpleRecord(keyBytes, keyBytes) // the value doesn't matter too much since we validate offsets
+        new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets
       }
-      val records = transactionalRecords(simpleRecords, producerId, producerEpoch, sequence)
-      log.appendAsLeader(records, leaderEpoch = 0)
+      val records = if (isTransactional)
+        MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
+      else
+        MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
       sequence += simpleRecords.size
+      log.appendAsLeader(records, leaderEpoch = 0)
     }
   }