You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/05/26 08:50:43 UTC
kafka git commit: MINOR: Preserve the base offset of the original
record batch in V2
Repository: kafka
Updated Branches:
refs/heads/trunk 02c0c3b01 -> 374336382
MINOR: Preserve the base offset of the original record batch in V2
The previous code did not handle this correctly if a batch was
compacted more than once.
Also add test case for duplicate check after log cleaning and
improve various comments.
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3145 from hachikuji/minor-improve-base-sequence-docs
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/37433638
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/37433638
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/37433638
Branch: refs/heads/trunk
Commit: 37433638271718344498d695d5da08db12c24eed
Parents: 02c0c3b
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri May 26 09:41:17 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Fri May 26 09:46:09 2017 +0100
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 4 +-
.../record/AbstractLegacyRecordBatch.java | 2 +-
.../kafka/common/record/DefaultRecord.java | 4 +-
.../kafka/common/record/DefaultRecordBatch.java | 9 +-
.../kafka/common/record/MemoryRecords.java | 9 +-
.../common/record/MemoryRecordsBuilder.java | 4 +-
.../org/apache/kafka/common/record/Record.java | 2 +-
.../apache/kafka/common/record/RecordBatch.java | 24 +++--
.../kafka/common/record/MemoryRecordsTest.java | 64 +++++++++++++-
core/src/main/scala/kafka/log/Log.scala | 5 +-
.../scala/unit/kafka/log/LogCleanerTest.scala | 92 ++++++++++++++++----
11 files changed, 178 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 01bd0e5..cd32850 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1035,9 +1035,9 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
} else if (isBatchAborted(currentBatch)) {
- log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition: {}",
+ log.trace("Skipping aborted record batch with producerId {} and base offset {}, partition {}",
producerId, currentBatch.baseOffset(), partition);
- nextFetchOffset = currentBatch.lastOffset() + 1;
+ nextFetchOffset = currentBatch.nextOffset();
continue;
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index 6ce3ba3..e028988 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -186,7 +186,7 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
@Override
- public long sequence() {
+ public int sequence() {
return RecordBatch.NO_SEQUENCE;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
index 9d0cd7e..05b5bb2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java
@@ -60,7 +60,7 @@ import static org.apache.kafka.common.utils.Utils.wrapNullable;
* ----------------
*
* The offset and timestamp deltas compute the difference relative to the base offset and
- * base timestamp of the log entry that this record is contained in.
+ * base timestamp of the batch that this record is contained in.
*/
public class DefaultRecord implements Record {
@@ -102,7 +102,7 @@ public class DefaultRecord implements Record {
}
@Override
- public long sequence() {
+ public int sequence() {
return sequence;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 13f958d..4e52d61 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -44,7 +44,7 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
* Magic => Int8
* CRC => Uint32
* Attributes => Int16
- * LastOffsetDelta => Int32
+ * LastOffsetDelta => Int32 // also serves as LastSequenceDelta
* BaseTimestamp => Int64
* MaxTimestamp => Int64
* ProducerId => Int64
@@ -61,6 +61,13 @@ import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
* computation to avoid the need to recompute the CRC when this field is assigned for every batch that is received by
* the broker. The CRC-32C (Castagnoli) polynomial is used for the computation.
*
+ * On compaction: unlike the older message formats, magic v2 and above preserves the first and last offset/sequence
+ * numbers from the original batch when the log is cleaned. This is required in order to be able to restore the
+ * producer's state when the log is reloaded. If we did not retain the last sequence number, for example, then
+ * after a partition leader failure, the producer might see an OutOfSequence error. The base sequence number must
+ * be preserved for duplicate checking (the broker checks incoming Produce requests for duplicates by verifying
+ * that the first and last sequence numbers of the incoming batch match the last from that producer).
+ *
* The current attributes are given below:
*
* -------------------------------------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 7391e7e..d3bdee2 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -141,13 +141,9 @@ public class MemoryRecords extends AbstractRecords {
byte batchMagic = batch.magic();
boolean writeOriginalEntry = true;
- long firstOffset = -1;
List<Record> retainedRecords = new ArrayList<>();
for (Record record : batch) {
- if (firstOffset < 0)
- firstOffset = record.offset();
-
messagesRead += 1;
if (filter.shouldRetain(batch, record)) {
@@ -178,8 +174,11 @@ public class MemoryRecords extends AbstractRecords {
ByteBuffer slice = destinationBuffer.slice();
TimestampType timestampType = batch.timestampType();
long logAppendTime = timestampType == TimestampType.LOG_APPEND_TIME ? batch.maxTimestamp() : RecordBatch.NO_TIMESTAMP;
+ long baseOffset = batchMagic >= RecordBatch.MAGIC_VALUE_V2 ?
+ batch.baseOffset() : retainedRecords.get(0).offset();
+
MemoryRecordsBuilder builder = builder(slice, batch.magic(), batch.compressionType(), timestampType,
- firstOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
+ baseOffset, logAppendTime, batch.producerId(), batch.producerEpoch(), batch.baseSequence(),
batch.isTransactional(), batch.partitionLeaderEpoch());
for (Record record : retainedRecords)
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index bc25d75..e055aa5 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -213,7 +213,7 @@ public class MemoryRecordsBuilder {
}
}
- public void setProducerState(long producerId, short epoch, int baseSequence) {
+ public void setProducerState(long producerId, short producerEpoch, int baseSequence) {
if (isClosed()) {
// Sequence numbers are assigned when the batch is closed while the accumulator is being drained.
// If the resulting ProduceRequest to the partition leader failed for a retriable error, the batch will
@@ -222,7 +222,7 @@ public class MemoryRecordsBuilder {
throw new IllegalStateException("Trying to set producer state of an already closed batch. This indicates a bug on the client.");
}
this.producerId = producerId;
- this.producerEpoch = epoch;
+ this.producerEpoch = producerEpoch;
this.baseSequence = baseSequence;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/Record.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Record.java b/clients/src/main/java/org/apache/kafka/common/record/Record.java
index 6de28c3..ab52bef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Record.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Record.java
@@ -38,7 +38,7 @@ public interface Record {
* Get the sequence number assigned by the producer.
* @return the sequence number
*/
- long sequence();
+ int sequence();
/**
* Get the size in bytes of this record.
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
index 42b0c2e..db75105 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordBatch.java
@@ -96,10 +96,11 @@ public interface RecordBatch extends Iterable<Record> {
TimestampType timestampType();
/**
- * Get the first offset contained in this record batch. For magic version prior to 2, this generally
- * requires deep iteration and will return the offset of the first record in the record batch. For
- * magic version 2 and above, this will return the first offset of the original record batch (i.e.
- * prior to compaction). For non-compacted topics, the behavior is equivalent.
+ * Get the base offset contained in this record batch. For magic version prior to 2, the base offset will
+ * always be the offset of the first message in the batch. This generally requires deep iteration and will
+ * return the offset of the first record in the record batch. For magic version 2 and above, this will return
+ * the first offset of the original record batch (i.e. prior to compaction). For non-compacted topics, the
+ * behavior is equivalent.
*
* Because this requires deep iteration for older magic versions, this method should be used with
* caution. Generally {@link #lastOffset()} is safer since access is efficient for all magic versions.
@@ -110,8 +111,9 @@ public interface RecordBatch extends Iterable<Record> {
long baseOffset();
/**
- * Get the last offset in this record batch (inclusive). Unlike {@link #baseOffset()}, the last offset
- * always reflects the offset of the last record in the batch, even after compaction.
+ * Get the last offset in this record batch (inclusive). Just like {@link #baseOffset()}, the last offset
+ * always reflects the offset of the last record in the original batch, even if it is removed during log
+ * compaction.
*
* @return The offset of the last record in this batch
*/
@@ -132,7 +134,7 @@ public interface RecordBatch extends Iterable<Record> {
byte magic();
/**
- * Get the producer id for this log record batch. For older magic versions, this will return 0.
+ * Get the producer id for this log record batch. For older magic versions, this will return -1.
*
* @return The producer id or -1 if there is none
*/
@@ -151,13 +153,17 @@ public interface RecordBatch extends Iterable<Record> {
boolean hasProducerId();
/**
- * Get the first sequence number of this record batch.
+ * Get the base sequence number of this record batch. Like {@link #baseOffset()}, this value is not
+ * affected by compaction: it always retains the base sequence number from the original batch.
+ *
* @return The first sequence number or -1 if there is none
*/
int baseSequence();
/**
- * Get the last sequence number of this record batch.
+ * Get the last sequence number of this record batch. Like {@link #lastOffset()}, the last sequence number
+ * always reflects the sequence number of the last record in the original batch, even if it is removed during log
+ * compaction.
*
* @return The last sequence number or -1 if there is none
*/
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 5a34f0f..afd0126 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -302,6 +302,50 @@ public class MemoryRecordsTest {
}
@Test
+ public void testFilterToAlreadyCompactedLog() {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ // create a batch with some offset gaps to simulate a compacted batch
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
+ TimestampType.CREATE_TIME, 0L);
+ builder.appendWithOffset(5L, 10L, null, "a".getBytes());
+ builder.appendWithOffset(8L, 11L, "1".getBytes(), "b".getBytes());
+ builder.appendWithOffset(10L, 12L, null, "c".getBytes());
+
+ builder.close();
+ buffer.flip();
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ MemoryRecords.readableRecords(buffer).filterTo(new RetainNonNullKeysFilter(), filtered);
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(1, batches.size());
+
+ MutableRecordBatch batch = batches.get(0);
+ List<Record> records = TestUtils.toList(batch);
+ assertEquals(1, records.size());
+ assertEquals(8L, records.get(0).offset());
+
+
+ if (magic >= RecordBatch.MAGIC_VALUE_V1)
+ assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(records.get(0)));
+ else
+ assertEquals(new SimpleRecord(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "b".getBytes()),
+ new SimpleRecord(records.get(0)));
+
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ // the new format preserves first and last offsets from the original batch
+ assertEquals(0L, batch.baseOffset());
+ assertEquals(10L, batch.lastOffset());
+ } else {
+ assertEquals(8L, batch.baseOffset());
+ assertEquals(8L, batch.lastOffset());
+ }
+ }
+
+ @Test
public void testFilterToPreservesProducerInfo() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
@@ -332,8 +376,8 @@ public class MemoryRecordsTest {
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
builder.append(16L, "6".getBytes(), "g".getBytes());
- builder.append(17L, null, "h".getBytes());
- builder.append(18L, "8".getBytes(), "i".getBytes());
+ builder.append(17L, "7".getBytes(), "h".getBytes());
+ builder.append(18L, null, "i".getBytes());
builder.close();
buffer.flip();
@@ -356,6 +400,10 @@ public class MemoryRecordsTest {
assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence());
assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence());
assertFalse(firstBatch.isTransactional());
+ List<Record> firstBatchRecords = TestUtils.toList(firstBatch);
+ assertEquals(1, firstBatchRecords.size());
+ assertEquals(RecordBatch.NO_SEQUENCE, firstBatchRecords.get(0).sequence());
+ assertEquals(new SimpleRecord(11L, "1".getBytes(), "b".getBytes()), new SimpleRecord(firstBatchRecords.get(0)));
MutableRecordBatch secondBatch = batches.get(1);
assertEquals(2, secondBatch.countOrNull().intValue());
@@ -366,6 +414,12 @@ public class MemoryRecordsTest {
assertEquals(baseSequence1, secondBatch.baseSequence());
assertEquals(baseSequence1 + 2, secondBatch.lastSequence());
assertFalse(secondBatch.isTransactional());
+ List<Record> secondBatchRecords = TestUtils.toList(secondBatch);
+ assertEquals(2, secondBatchRecords.size());
+ assertEquals(baseSequence1 + 1, secondBatchRecords.get(0).sequence());
+ assertEquals(new SimpleRecord(14L, "4".getBytes(), "e".getBytes()), new SimpleRecord(secondBatchRecords.get(0)));
+ assertEquals(baseSequence1 + 2, secondBatchRecords.get(1).sequence());
+ assertEquals(new SimpleRecord(15L, "5".getBytes(), "f".getBytes()), new SimpleRecord(secondBatchRecords.get(1)));
MutableRecordBatch thirdBatch = batches.get(2);
assertEquals(2, thirdBatch.countOrNull().intValue());
@@ -376,6 +430,12 @@ public class MemoryRecordsTest {
assertEquals(baseSequence2, thirdBatch.baseSequence());
assertEquals(baseSequence2 + 2, thirdBatch.lastSequence());
assertTrue(thirdBatch.isTransactional());
+ List<Record> thirdBatchRecords = TestUtils.toList(thirdBatch);
+ assertEquals(2, thirdBatchRecords.size());
+ assertEquals(baseSequence2, thirdBatchRecords.get(0).sequence());
+ assertEquals(new SimpleRecord(16L, "6".getBytes(), "g".getBytes()), new SimpleRecord(thirdBatchRecords.get(0)));
+ assertEquals(baseSequence2 + 1, thirdBatchRecords.get(1).sequence());
+ assertEquals(new SimpleRecord(17L, "7".getBytes(), "h".getBytes()), new SimpleRecord(thirdBatchRecords.get(1)));
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index 55eb46a..67b9271 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -56,7 +56,8 @@ object LogAppendInfo {
/**
* Struct to hold various quantities we compute about each message set before appending to the log
*
- * @param firstOffset The first offset in the message set
+ * @param firstOffset The first offset in the message set unless the message format is less than V2 and we are appending
+ * to the follower. In that case, this will be the last offset for performance reasons.
* @param lastOffset The last offset in the message set
* @param maxTimestamp The maximum timestamp of the message set.
* @param offsetOfMaxTimestamp The offset of the message with the maximum timestamp.
@@ -750,6 +751,8 @@ class Log(@volatile var dir: File,
// update the first offset if on the first message. For magic versions older than 2, we use the last offset
// to avoid the need to decompress the data (the last offset can be obtained directly from the wrapper message).
// For magic version 2, we can get the first offset directly from the batch header.
+ // When appending to the leader, we will update LogAppendInfo.baseOffset with the correct value. In the follower
+ // case, validation will be more lenient.
if (firstOffset < 0)
firstOffset = if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) batch.baseOffset else batch.lastOffset
http://git-wip-us.apache.org/repos/asf/kafka/blob/37433638/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 6eb65ca..a280679 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -19,7 +19,7 @@ package kafka.log
import java.io.File
import java.nio._
-import java.nio.file.Paths
+import java.nio.file.{Files, Paths}
import java.util.Properties
import kafka.common._
@@ -39,7 +39,7 @@ import scala.collection._
* Unit tests for the log cleaning logic
*/
class LogCleanerTest extends JUnitSuite {
-
+
val tmpdir = TestUtils.tempDir()
val dir = TestUtils.randomPartitionLogDir(tmpdir)
val logProps = new Properties()
@@ -50,12 +50,12 @@ class LogCleanerTest extends JUnitSuite {
val logConfig = LogConfig(logProps)
val time = new MockTime()
val throttler = new Throttler(desiredRatePerSec = Double.MaxValue, checkIntervalMs = Long.MaxValue, time = time)
-
+
@After
def teardown(): Unit = {
Utils.delete(tmpdir)
}
-
+
/**
* Test simple log cleaning
*/
@@ -89,6 +89,66 @@ class LogCleanerTest extends JUnitSuite {
}
@Test
+ def testDuplicateCheckAfterCleaning(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
+ var log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val pid1 = 1
+ val pid2 = 2
+ val pid3 = 3
+ val pid4 = 4
+
+ appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+ appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
+ appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
+
+ log.roll()
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+ assertEquals(List(2, 3, 3, 4, 1, 4), keysInLog(log))
+ assertEquals(List(1, 2, 3, 5, 6, 7), offsetsInLog(log))
+
+ // we have to reload the log to validate that the cleaner maintained sequence numbers correctly
+ def reloadLog(): Unit = {
+ log.close()
+ log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps), recoveryPoint = 0L)
+ }
+
+ reloadLog()
+
+ // check duplicate append from producer 1
+ var logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+ assertEquals(0L, logAppendInfo.firstOffset)
+ assertEquals(2L, logAppendInfo.lastOffset)
+
+ // check duplicate append from producer 3
+ logAppendInfo = appendIdempotentAsLeader(log, pid3, producerEpoch)(Seq(1, 4))
+ assertEquals(6L, logAppendInfo.firstOffset)
+ assertEquals(7L, logAppendInfo.lastOffset)
+
+ // check duplicate append from producer 2
+ logAppendInfo = appendIdempotentAsLeader(log, pid2, producerEpoch)(Seq(3, 1, 4))
+ assertEquals(3L, logAppendInfo.firstOffset)
+ assertEquals(5L, logAppendInfo.lastOffset)
+
+ // do one more append and a round of cleaning to force another deletion from producer 1's batch
+ appendIdempotentAsLeader(log, pid4, producerEpoch)(Seq(2))
+ log.roll()
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+ assertEquals(List(3, 3, 4, 1, 4, 2), keysInLog(log))
+ assertEquals(List(2, 3, 5, 6, 7, 8), offsetsInLog(log))
+
+ reloadLog()
+
+ // duplicate append from producer1 should still be fine
+ logAppendInfo = appendIdempotentAsLeader(log, pid1, producerEpoch)(Seq(1, 2, 3))
+ assertEquals(0L, logAppendInfo.firstOffset)
+ assertEquals(2L, logAppendInfo.lastOffset)
+ }
+
+ @Test
def testBasicTransactionAwareCleaning(): Unit = {
val cleaner = makeCleaner(Int.MaxValue)
val logProps = new Properties()
@@ -974,8 +1034,8 @@ class LogCleanerTest extends JUnitSuite {
private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
- private def makeLog(dir: File = dir, config: LogConfig = logConfig) =
- new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler,
+ private def makeLog(dir: File = dir, config: LogConfig = logConfig, recoveryPoint: Long = 0L) =
+ new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = recoveryPoint, scheduler = time.scheduler,
time = time, brokerTopicStats = new BrokerTopicStats)
private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }
@@ -1006,23 +1066,25 @@ class LogCleanerTest extends JUnitSuite {
partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
}
- private def transactionalRecords(records: Seq[SimpleRecord],
- producerId: Long,
- producerEpoch: Short,
- sequence: Int): MemoryRecords = {
- MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, records: _*)
+ private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => LogAppendInfo = {
+ appendIdempotentAsLeader(log, producerId, producerEpoch, isTransactional = true)
}
- private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => Unit = {
+ private def appendIdempotentAsLeader(log: Log, producerId: Long,
+ producerEpoch: Short = 0,
+ isTransactional: Boolean = false): Seq[Int] => LogAppendInfo = {
var sequence = 0
keys: Seq[Int] => {
val simpleRecords = keys.map { key =>
val keyBytes = key.toString.getBytes
- new SimpleRecord(keyBytes, keyBytes) // the value doesn't matter too much since we validate offsets
+ new SimpleRecord(time.milliseconds(), keyBytes, keyBytes) // the value doesn't matter since we validate offsets
}
- val records = transactionalRecords(simpleRecords, producerId, producerEpoch, sequence)
- log.appendAsLeader(records, leaderEpoch = 0)
+ val records = if (isTransactional)
+ MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
+ else
+ MemoryRecords.withIdempotentRecords(CompressionType.NONE, producerId, producerEpoch, sequence, simpleRecords: _*)
sequence += simpleRecords.size
+ log.appendAsLeader(records, leaderEpoch = 0)
}
}