You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/12 19:32:52 UTC
[1/2] kafka git commit: KAFKA-5196; Make LogCleaner transaction-aware
Repository: kafka
Updated Branches:
refs/heads/trunk 1c2bbaa50 -> 7baa58d79
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala b/core/src/test/scala/unit/kafka/log/LogTest.scala
index b4fe9fb..2283077 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -375,9 +375,10 @@ class LogTest {
producerId: Long,
epoch: Short,
offset: Long = 0L,
- coordinatorEpoch: Int = 0): MemoryRecords = {
+ coordinatorEpoch: Int = 0,
+ partitionLeaderEpoch: Int = 0): MemoryRecords = {
val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
- MemoryRecords.withEndTransactionMarker(offset, producerId, epoch, marker)
+ MemoryRecords.withEndTransactionMarker(offset, time.milliseconds(), partitionLeaderEpoch, producerId, epoch, marker)
}
@Test
@@ -2382,22 +2383,22 @@ class LogTest {
private def allAbortedTransactions(log: Log) = log.logSegments.flatMap(_.txnIndex.allAbortedTxns)
- private def appendTransactionalAsLeader(log: Log, pid: Long, producerEpoch: Short): Int => Unit = {
+ private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short): Int => Unit = {
var sequence = 0
numRecords: Int => {
val simpleRecords = (sequence until sequence + numRecords).map { seq =>
new SimpleRecord(s"$seq".getBytes)
}
- val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, pid,
+ val records = MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId,
producerEpoch, sequence, simpleRecords: _*)
log.appendAsLeader(records, leaderEpoch = 0)
sequence += numRecords
}
}
- private def appendEndTxnMarkerAsLeader(log: Log, pid: Long, producerEpoch: Short,
+ private def appendEndTxnMarkerAsLeader(log: Log, producerId: Long, producerEpoch: Short,
controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
- val records = endTxnRecords(controlType, pid, producerEpoch, coordinatorEpoch = coordinatorEpoch)
+ val records = endTxnRecords(controlType, producerId, producerEpoch, coordinatorEpoch = coordinatorEpoch)
log.appendAsLeader(records, isFromClient = false, leaderEpoch = 0)
}
@@ -2409,10 +2410,10 @@ class LogTest {
log.appendAsLeader(records, leaderEpoch = 0)
}
- private def appendTransactionalToBuffer(buffer: ByteBuffer, pid: Long, epoch: Short): (Long, Int) => Unit = {
+ private def appendTransactionalToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short): (Long, Int) => Unit = {
var sequence = 0
(offset: Long, numRecords: Int) => {
- val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, pid, epoch, sequence, true)
+ val builder = MemoryRecords.builder(buffer, CompressionType.NONE, offset, producerId, producerEpoch, sequence, true)
for (seq <- sequence until sequence + numRecords) {
val record = new SimpleRecord(s"$seq".getBytes)
builder.append(record)
@@ -2424,9 +2425,9 @@ class LogTest {
}
private def appendEndTxnMarkerToBuffer(buffer: ByteBuffer, producerId: Long, producerEpoch: Short, offset: Long,
- controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
+ controlType: ControlRecordType, coordinatorEpoch: Int = 0): Unit = {
val marker = new EndTransactionMarker(controlType, coordinatorEpoch)
- MemoryRecords.writeEndTransactionalMarker(buffer, offset, producerId, producerEpoch, marker)
+ MemoryRecords.writeEndTransactionalMarker(buffer, offset, time.milliseconds(), 0, producerId, producerEpoch, marker)
}
private def appendNonTransactionalToBuffer(buffer: ByteBuffer, offset: Long, numRecords: Int): Unit = {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
index 4546818..16173eb 100644
--- a/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
+++ b/core/src/test/scala/unit/kafka/log/TransactionIndexTest.scala
@@ -85,15 +85,13 @@ class TransactionIndexTest extends JUnitSuite {
@Test
def testCollectAbortedTransactions(): Unit = {
- val abortedTxns = List(
+ val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 11),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 13),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
- abortedTxns.foreach(index.append)
-
- val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
+ abortedTransactions.foreach(index.append)
var result = index.collectAbortedTxns(0L, 100L)
assertEquals(abortedTransactions, result.abortedTransactions)
@@ -122,14 +120,13 @@ class TransactionIndexTest extends JUnitSuite {
@Test
def testTruncate(): Unit = {
- val abortedTxns = List(
+ val abortedTransactions = List(
new AbortedTxn(producerId = 0L, firstOffset = 0, lastOffset = 10, lastStableOffset = 2),
new AbortedTxn(producerId = 1L, firstOffset = 5, lastOffset = 15, lastStableOffset = 16),
new AbortedTxn(producerId = 2L, firstOffset = 18, lastOffset = 35, lastStableOffset = 25),
new AbortedTxn(producerId = 3L, firstOffset = 32, lastOffset = 50, lastStableOffset = 40))
- val abortedTransactions = abortedTxns.map(_.asAbortedTransaction)
- abortedTxns.foreach(index.append)
+ abortedTransactions.foreach(index.append)
index.truncateTo(51)
assertEquals(abortedTransactions, index.collectAbortedTxns(0L, 100L).abortedTransactions)
[2/2] kafka git commit: KAFKA-5196; Make LogCleaner transaction-aware
Posted by jg...@apache.org.
KAFKA-5196; Make LogCleaner transaction-aware
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Jun Rao <ju...@gmail.com>
Closes #3008 from hachikuji/KAFKA-5196
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/7baa58d7
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/7baa58d7
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/7baa58d7
Branch: refs/heads/trunk
Commit: 7baa58d797126b6fb2b1de30e72428895d2bcb40
Parents: 1c2bbaa
Author: Jason Gustafson <ja...@confluent.io>
Authored: Fri May 12 12:07:22 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Fri May 12 12:07:22 2017 -0700
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 38 ++-
.../kafka/common/record/MemoryRecords.java | 51 +++-
.../clients/consumer/internals/FetcherTest.java | 189 ++++++++++---
.../common/record/MemoryRecordsBuilderTest.java | 4 +-
.../kafka/common/record/MemoryRecordsTest.java | 57 +++-
core/src/main/scala/kafka/log/Log.scala | 30 +-
core/src/main/scala/kafka/log/LogCleaner.scala | 257 +++++++++++++----
.../scala/kafka/log/LogCleanerManager.scala | 29 +-
core/src/main/scala/kafka/log/OffsetMap.scala | 5 +
.../scala/kafka/log/ProducerStateManager.scala | 2 +-
.../main/scala/kafka/log/TransactionIndex.scala | 13 +-
.../unit/kafka/log/LogCleanerManagerTest.scala | 48 ++++
.../scala/unit/kafka/log/LogCleanerTest.scala | 280 ++++++++++++++++---
.../scala/unit/kafka/log/LogSegmentTest.scala | 26 +-
.../src/test/scala/unit/kafka/log/LogTest.scala | 21 +-
.../unit/kafka/log/TransactionIndexTest.scala | 11 +-
16 files changed, 840 insertions(+), 221 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index 66221c0..e4365da 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -1015,6 +1015,11 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
maybeEnsureValid(currentBatch);
if (isolationLevel == IsolationLevel.READ_COMMITTED && currentBatch.hasProducerId()) {
+ // remove from the aborted transaction queue all aborted transactions which have begun
+ // before the current batch's last offset and add the associated producerIds to the
+ // aborted producer set
+ consumeAbortedTransactionsUpTo(currentBatch.lastOffset());
+
long producerId = currentBatch.producerId();
if (containsAbortMarker(currentBatch)) {
abortedProducerIds.remove(producerId);
@@ -1072,29 +1077,18 @@ public class Fetcher<K, V> implements SubscriptionState.Listener, Closeable {
return records;
}
- private boolean isBatchAborted(RecordBatch batch) {
- /* When in READ_COMMITTED mode, we need to do the following for each incoming entry:
- * 0. Check whether the pid is in the 'abortedProducerIds' set && the entry does not include an abort marker.
- * If so, skip the entry.
- * 1. If the pid is in aborted pids and the entry contains an abort marker, remove the pid from
- * aborted pids and skip the entry.
- * 2. Check lowest offset entry in the abort index. If the PID of the current entry matches the
- * pid of the abort index entry, and the incoming offset is no smaller than the abort index offset,
- * this means that the entry has been aborted. Add the pid to the aborted pids set, and remove
- * the entry from the abort index.
- */
- long producerId = batch.producerId();
- if (abortedProducerIds.contains(producerId)) {
- return true;
- } else if (abortedTransactions != null && !abortedTransactions.isEmpty()) {
- FetchResponse.AbortedTransaction nextAbortedTransaction = abortedTransactions.peek();
- if (nextAbortedTransaction.producerId == producerId && nextAbortedTransaction.firstOffset <= batch.baseOffset()) {
- abortedProducerIds.add(producerId);
- abortedTransactions.poll();
- return true;
- }
+ private void consumeAbortedTransactionsUpTo(long offset) {
+ if (abortedTransactions == null)
+ return;
+
+ while (!abortedTransactions.isEmpty() && abortedTransactions.peek().firstOffset <= offset) {
+ FetchResponse.AbortedTransaction abortedTransaction = abortedTransactions.poll();
+ abortedProducerIds.add(abortedTransaction.producerId);
}
- return false;
+ }
+
+ private boolean isBatchAborted(RecordBatch batch) {
+ return batch.isTransactional() && abortedProducerIds.contains(batch.producerId());
}
private PriorityQueue<FetchResponse.AbortedTransaction> abortedTransactions(FetchResponse.PartitionData partition) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index c8754c7..a222cc3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -131,6 +131,9 @@ public class MemoryRecords extends AbstractRecords {
for (MutableRecordBatch batch : batches) {
bytesRead += batch.sizeInBytes();
+ if (filter.shouldDiscard(batch))
+ continue;
+
// We use the absolute offset to decide whether to retain the message or not. Due to KAFKA-4298, we have to
// allow for the possibility that a previous version corrupted the log by writing a compressed record batch
// with a magic value not matching the magic of the records (magic < 2). This will be fixed as we
@@ -251,8 +254,21 @@ public class MemoryRecords extends AbstractRecords {
return buffer.hashCode();
}
- public interface RecordFilter {
- boolean shouldRetain(RecordBatch recordBatch, Record record);
+ public static abstract class RecordFilter {
+ /**
+ * Check whether the full batch can be discarded (i.e. whether we even need to
+ * check the records individually).
+ */
+ protected boolean shouldDiscard(RecordBatch batch) {
+ return false;
+ }
+
+ /**
+ * Check whether a record should be retained in the log. Only records from
+ * batches which were not discarded with {@link #shouldDiscard(RecordBatch)}
+ * will be considered.
+ */
+ protected abstract boolean shouldRetain(RecordBatch recordBatch, Record record);
}
public static class FilterResult {
@@ -432,9 +448,10 @@ public class MemoryRecords extends AbstractRecords {
}
public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId,
- short producerEpoch, int baseSequence, SimpleRecord... records) {
+ short producerEpoch, int baseSequence, int partitionLeaderEpoch,
+ SimpleRecord... records) {
return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
- producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
+ producerId, producerEpoch, baseSequence, partitionLeaderEpoch, true, records);
}
public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
@@ -464,28 +481,38 @@ public class MemoryRecords extends AbstractRecords {
}
public static MemoryRecords withEndTransactionMarker(long producerId, short producerEpoch, EndTransactionMarker marker) {
- return withEndTransactionMarker(0L, producerId, producerEpoch, marker);
+ return withEndTransactionMarker(0L, System.currentTimeMillis(), RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId, producerEpoch, marker);
+ }
+
+ public static MemoryRecords withEndTransactionMarker(long timestamp, long producerId, short producerEpoch,
+ EndTransactionMarker marker) {
+ return withEndTransactionMarker(0L, timestamp, RecordBatch.NO_PARTITION_LEADER_EPOCH, producerId,
+ producerEpoch, marker);
}
- public static MemoryRecords withEndTransactionMarker(long initialOffset, long producerId, short producerEpoch,
+ public static MemoryRecords withEndTransactionMarker(long initialOffset, long timestamp, int partitionLeaderEpoch,
+ long producerId, short producerEpoch,
EndTransactionMarker marker) {
int endTxnMarkerBatchSize = DefaultRecordBatch.RECORD_BATCH_OVERHEAD +
EndTransactionMarker.CURRENT_END_TXN_SCHEMA_RECORD_SIZE;
ByteBuffer buffer = ByteBuffer.allocate(endTxnMarkerBatchSize);
- writeEndTransactionalMarker(buffer, initialOffset, producerId, producerEpoch, marker);
+ writeEndTransactionalMarker(buffer, initialOffset, timestamp, partitionLeaderEpoch, producerId,
+ producerEpoch, marker);
buffer.flip();
return MemoryRecords.readableRecords(buffer);
}
- public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long producerId,
- short producerEpoch, EndTransactionMarker marker) {
+ public static void writeEndTransactionalMarker(ByteBuffer buffer, long initialOffset, long timestamp,
+ int partitionLeaderEpoch, long producerId, short producerEpoch,
+ EndTransactionMarker marker) {
boolean isTransactional = true;
boolean isControlBatch = true;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
- TimestampType.CREATE_TIME, initialOffset, RecordBatch.NO_TIMESTAMP, producerId, producerEpoch,
- RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ TimestampType.CREATE_TIME, initialOffset, timestamp, producerId, producerEpoch,
+ RecordBatch.NO_SEQUENCE, isTransactional, isControlBatch, partitionLeaderEpoch,
buffer.capacity());
- builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
+ builder.appendEndTxnMarker(timestamp, marker);
builder.close();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 743568d..4e46d57 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -53,6 +53,7 @@ import org.apache.kafka.common.record.Record;
import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.SimpleRecord;
import org.apache.kafka.common.requests.IsolationLevel;
+import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.ByteBufferOutputStream;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.requests.AbstractRequest;
@@ -184,17 +185,18 @@ public class FetcherTest {
assertFalse(fetcher.hasCompletedFetches());
long producerId = 1;
- short epoch = 0;
+ short producerEpoch = 0;
int baseSequence = 0;
+ int partitionLeaderEpoch = 0;
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, CompressionType.NONE, 0L, producerId,
- epoch, baseSequence);
+ producerEpoch, baseSequence);
builder.append(0L, "key".getBytes(), null);
builder.close();
- MemoryRecords.writeEndTransactionalMarker(buffer, 1L, producerId, epoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
- );
+ MemoryRecords.writeEndTransactionalMarker(buffer, 1L, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
+ new EndTransactionMarker(ControlRecordType.ABORT, 0));
buffer.flip();
@@ -1326,51 +1328,54 @@ public class FetcherTest {
}
@Test
- public void testWithCommittedAndAbortedTransactions() {
+ public void testReadCommittedWithCommittedAndAbortedTransactions() {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
ByteBuffer buffer = ByteBuffer.allocate(1024);
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
- int currOffset = 0;
+ long pid1 = 1L;
+ long pid2 = 2L;
+
// Appends for producer 1 (eventually committed)
- currOffset += appendTransactionalRecords(buffer, 1L, currOffset,
- new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
- new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid1, 0L,
+ new SimpleRecord("commit1-1".getBytes(), "value".getBytes()),
+ new SimpleRecord("commit1-2".getBytes(), "value".getBytes()));
// Appends for producer 2 (eventually aborted)
- currOffset += appendTransactionalRecords(buffer, 2L, currOffset,
- new SimpleRecord(time.milliseconds(), "abort2-1".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid2, 2L,
+ new SimpleRecord("abort2-1".getBytes(), "value".getBytes()));
// commit producer 1
- currOffset += commitTransaction(buffer, 1L, currOffset);
+ commitTransaction(buffer, pid1, 3L);
+
// append more for producer 2 (eventually aborted)
- currOffset += appendTransactionalRecords(buffer, 2L, currOffset,
- new SimpleRecord(time.milliseconds(), "abort2-2".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid2, 4L,
+ new SimpleRecord("abort2-2".getBytes(), "value".getBytes()));
// abort producer 2
- currOffset += abortTransaction(buffer, 2L, currOffset);
- abortedTransactions.add(new FetchResponse.AbortedTransaction(2, 2));
+ abortTransaction(buffer, pid2, 5L);
+ abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 2L));
// New transaction for producer 1 (eventually aborted)
- currOffset += appendTransactionalRecords(buffer, 1L, currOffset,
- new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid1, 6L,
+ new SimpleRecord("abort1-1".getBytes(), "value".getBytes()));
// New transaction for producer 2 (eventually committed)
- currOffset += appendTransactionalRecords(buffer, 2L, currOffset,
- new SimpleRecord(time.milliseconds(), "commit2-1".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid2, 7L,
+ new SimpleRecord("commit2-1".getBytes(), "value".getBytes()));
// Add messages for producer 1 (eventually aborted)
- currOffset += appendTransactionalRecords(buffer, 1L, currOffset,
- new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
+ appendTransactionalRecords(buffer, pid1, 8L,
+ new SimpleRecord("abort1-2".getBytes(), "value".getBytes()));
// abort producer 1
- currOffset += abortTransaction(buffer, 1L, currOffset);
+ abortTransaction(buffer, pid1, 9L);
abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 6));
// commit producer 2
- currOffset += commitTransaction(buffer, 2L, currOffset);
+ commitTransaction(buffer, pid2, 10L);
buffer.flip();
@@ -1416,7 +1421,7 @@ public class FetcherTest {
currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
- currentOffset += commitTransaction(buffer, 1L, currentOffset);
+ commitTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1447,6 +1452,108 @@ public class FetcherTest {
}
@Test
+ public void testReadCommittedAbortMarkerWithNoData() {
+ Fetcher<String, String> fetcher = createFetcher(subscriptions, new Metrics(), new StringDeserializer(),
+ new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+ long producerId = 1L;
+
+ abortTransaction(buffer, producerId, 5L);
+
+ appendTransactionalRecords(buffer, producerId, 6L,
+ new SimpleRecord("6".getBytes(), null),
+ new SimpleRecord("7".getBytes(), null),
+ new SimpleRecord("8".getBytes(), null));
+
+ commitTransaction(buffer, producerId, 9L);
+
+ buffer.flip();
+
+ // send the fetch
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
+ assertEquals(1, fetcher.sendFetches());
+
+ // prepare the response. the aborted transactions begin at offsets which are no longer in the log
+ List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+ abortedTransactions.add(new FetchResponse.AbortedTransaction(producerId, 0L));
+
+ client.prepareResponse(fetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(buffer),
+ abortedTransactions, Errors.NONE, 100L, 100L, 0));
+ consumerClient.poll(0);
+ assertTrue(fetcher.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords();
+ assertTrue(allFetchedRecords.containsKey(tp1));
+ List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1);
+ assertEquals(3, fetchedRecords.size());
+ assertEquals(Arrays.asList(6L, 7L, 8L), collectRecordOffsets(fetchedRecords));
+ }
+
+ @Test
+ public void testReadCommittedWithCompactedTopic() {
+ Fetcher<String, String> fetcher = createFetcher(subscriptions, new Metrics(), new StringDeserializer(),
+ new StringDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_COMMITTED);
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+ long pid1 = 1L;
+ long pid2 = 2L;
+ long pid3 = 3L;
+
+ appendTransactionalRecords(buffer, pid3, 3L,
+ new SimpleRecord("3".getBytes(), "value".getBytes()),
+ new SimpleRecord("4".getBytes(), "value".getBytes()));
+
+ appendTransactionalRecords(buffer, pid2, 15L,
+ new SimpleRecord("15".getBytes(), "value".getBytes()),
+ new SimpleRecord("16".getBytes(), "value".getBytes()),
+ new SimpleRecord("17".getBytes(), "value".getBytes()));
+
+ appendTransactionalRecords(buffer, pid1, 22L,
+ new SimpleRecord("22".getBytes(), "value".getBytes()),
+ new SimpleRecord("23".getBytes(), "value".getBytes()));
+
+ abortTransaction(buffer, pid2, 28L);
+
+ appendTransactionalRecords(buffer, pid3, 30L,
+ new SimpleRecord("30".getBytes(), "value".getBytes()),
+ new SimpleRecord("31".getBytes(), "value".getBytes()),
+ new SimpleRecord("32".getBytes(), "value".getBytes()));
+
+ commitTransaction(buffer, pid3, 35L);
+
+ appendTransactionalRecords(buffer, pid1, 39L,
+ new SimpleRecord("39".getBytes(), "value".getBytes()),
+ new SimpleRecord("40".getBytes(), "value".getBytes()));
+
+ // transaction from pid1 is aborted, but the marker is not included in the fetch
+
+ buffer.flip();
+
+ // send the fetch
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.seek(tp1, 0);
+ assertEquals(1, fetcher.sendFetches());
+
+ // prepare the response. the aborted transactions begin at offsets which are no longer in the log
+ List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
+ abortedTransactions.add(new FetchResponse.AbortedTransaction(pid2, 6L));
+ abortedTransactions.add(new FetchResponse.AbortedTransaction(pid1, 0L));
+
+ client.prepareResponse(fetchResponseWithAbortedTransactions(MemoryRecords.readableRecords(buffer),
+ abortedTransactions, Errors.NONE, 100L, 100L, 0));
+ consumerClient.poll(0);
+ assertTrue(fetcher.hasCompletedFetches());
+
+ Map<TopicPartition, List<ConsumerRecord<String, String>>> allFetchedRecords = fetcher.fetchedRecords();
+ assertTrue(allFetchedRecords.containsKey(tp1));
+ List<ConsumerRecord<String, String>> fetchedRecords = allFetchedRecords.get(tp1);
+ assertEquals(5, fetchedRecords.size());
+ assertEquals(Arrays.asList(3L, 4L, 30L, 31L, 32L), collectRecordOffsets(fetchedRecords));
+ }
+
+ @Test
public void testReturnAbortedTransactionsinUncommittedMode() {
Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
new ByteArrayDeserializer(), Integer.MAX_VALUE, IsolationLevel.READ_UNCOMMITTED);
@@ -1457,7 +1564,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset);
+ abortTransaction(buffer, 1L, currentOffset);
buffer.flip();
@@ -1516,9 +1623,9 @@ public class FetcherTest {
assertEquals(currentOffset, (long) subscriptions.position(tp1));
}
- private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord... records) {
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true,
+ private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, int baseSequence, SimpleRecord... records) {
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, CompressionType.NONE,
+ TimestampType.CREATE_TIME, baseOffset, time.milliseconds(), pid, (short) 0, baseSequence, true,
RecordBatch.NO_PARTITION_LEADER_EPOCH);
for (SimpleRecord record : records) {
@@ -1528,14 +1635,22 @@ public class FetcherTest {
return records.length;
}
- private int commitTransaction(ByteBuffer buffer, long producerId, int baseOffset) {
- MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+ private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord... records) {
+ return appendTransactionalRecords(buffer, pid, baseOffset, (int) baseOffset, records);
+ }
+
+ private int commitTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
return 1;
}
private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
- MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+ short producerEpoch = 0;
+ int partitionLeaderEpoch = 0;
+ MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, time.milliseconds(), partitionLeaderEpoch, producerId, producerEpoch,
new EndTransactionMarker(ControlRecordType.ABORT, 0));
return 1;
}
@@ -1605,7 +1720,9 @@ public class FetcherTest {
private FetchResponse fetchResponseWithAbortedTransactions(MemoryRecords records,
List<FetchResponse.AbortedTransaction> abortedTransactions,
Errors error,
- long lastStableOffset, long hw, int throttleTime) {
+ long lastStableOffset,
+ long hw,
+ int throttleTime) {
Map<TopicPartition, FetchResponse.PartitionData> partitions = Collections.singletonMap(tp1,
new FetchResponse.PartitionData(error, hw, lastStableOffset, 0L, abortedTransactions, records));
return new FetchResponse(new LinkedHashMap<>(partitions), throttleTime);
@@ -1681,4 +1798,10 @@ public class FetcherTest {
isolationLevel);
}
+ private <T> List<Long> collectRecordOffsets(List<ConsumerRecord<T, T>> records) {
+ List<Long> res = new ArrayList<>(records.size());
+ for (ConsumerRecord<?, ?> record : records)
+ res.add(record.offset());
+ return res;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 0467522..a300a65 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -400,7 +400,7 @@ public class MemoryRecordsBuilderTest {
builder.append(10L, "1".getBytes(), "a".getBytes());
builder.close();
- MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 15L, (short) 0,
+ MemoryRecords.writeEndTransactionalMarker(buffer, 1L, System.currentTimeMillis(), 0, 15L, (short) 0,
new EndTransactionMarker(ControlRecordType.ABORT, 0));
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
@@ -409,7 +409,7 @@ public class MemoryRecordsBuilderTest {
builder.append(13L, "3".getBytes(), "c".getBytes());
builder.close();
- MemoryRecords.writeEndTransactionalMarker(buffer, 14L, 1L, (short) 0,
+ MemoryRecords.writeEndTransactionalMarker(buffer, 14L, System.currentTimeMillis(), 0, 1L, (short) 0,
new EndTransactionMarker(ControlRecordType.COMMIT, 0));
buffer.flip();
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 014a5bd..5a34f0f 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -222,8 +222,11 @@ public class MemoryRecordsTest {
short producerEpoch = 13;
long initialOffset = 983L;
int coordinatorEpoch = 347;
+ int partitionLeaderEpoch = 29;
+
EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
- MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, producerId, producerEpoch, marker);
+ MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, System.currentTimeMillis(),
+ partitionLeaderEpoch, producerId, producerEpoch, marker);
// verify that buffer allocation was precise
assertEquals(records.buffer().remaining(), records.buffer().capacity());
@@ -235,6 +238,7 @@ public class MemoryRecordsTest {
assertEquals(producerId, batch.producerId());
assertEquals(producerEpoch, batch.producerEpoch());
assertEquals(initialOffset, batch.baseOffset());
+ assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
assertTrue(batch.isValid());
List<Record> createdRecords = TestUtils.toList(batch);
@@ -249,6 +253,55 @@ public class MemoryRecordsTest {
}
@Test
+ public void testFilterToBatchDiscard() {
+ if (compression != CompressionType.NONE || magic >= RecordBatch.MAGIC_VALUE_V2) {
+ ByteBuffer buffer = ByteBuffer.allocate(2048);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, "1".getBytes(), "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "2".getBytes(), "b".getBytes());
+ builder.append(12L, "3".getBytes(), "c".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L);
+ builder.append(13L, "4".getBytes(), "d".getBytes());
+ builder.append(20L, "5".getBytes(), "e".getBytes());
+ builder.append(15L, "6".getBytes(), "f".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 6L);
+ builder.append(16L, "7".getBytes(), "g".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ ByteBuffer filtered = ByteBuffer.allocate(2048);
+ MemoryRecords.readableRecords(buffer).filterTo(new MemoryRecords.RecordFilter() {
+ @Override
+ protected boolean shouldDiscard(RecordBatch batch) {
+ // discard the second and fourth batches
+ return batch.lastOffset() == 2L || batch.lastOffset() == 6L;
+ }
+
+ @Override
+ protected boolean shouldRetain(RecordBatch recordBatch, Record record) {
+ return true;
+ }
+ }, filtered);
+
+ filtered.flip();
+ MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
+
+ List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(2, batches.size());
+ assertEquals(0L, batches.get(0).lastOffset());
+ assertEquals(5L, batches.get(1).lastOffset());
+ }
+ }
+
+ @Test
public void testFilterToPreservesProducerInfo() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
@@ -490,7 +543,7 @@ public class MemoryRecordsTest {
return values;
}
- private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
+ private static class RetainNonNullKeysFilter extends MemoryRecords.RecordFilter {
@Override
public boolean shouldRetain(RecordBatch batch, Record record) {
return record.hasKey();
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/Log.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/Log.scala b/core/src/main/scala/kafka/log/Log.scala
index d3ea251..77ab25d 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -159,8 +159,7 @@ class Log(@volatile var dir: File,
loadSegments()
/* Calculate the offset of the next message */
- nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset,
- activeSegment.size.toInt)
+ nextOffsetMetadata = new LogOffsetMetadata(activeSegment.nextOffset, activeSegment.baseOffset, activeSegment.size)
leaderEpochCache.clearAndFlushLatest(nextOffsetMetadata.messageOffset)
@@ -879,6 +878,14 @@ class Log(@volatile var dir: File,
FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
}
+ private[log] def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long): List[AbortedTxn] = {
+ val segmentEntry = segments.floorEntry(startOffset)
+ val allAbortedTxns = ListBuffer.empty[AbortedTxn]
+ def accumulator(abortedTxns: List[AbortedTxn]): Unit = allAbortedTxns ++= abortedTxns
+ collectAbortedTransactions(logStartOffset, upperBoundOffset, segmentEntry, accumulator)
+ allAbortedTxns.toList
+ }
+
private def addAbortedTransactions(startOffset: Long, segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo = {
val fetchSize = fetchInfo.records.sizeInBytes
@@ -891,27 +898,28 @@ class Log(@volatile var dir: File,
else
logEndOffset
}
- val abortedTransactions = collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry)
+
+ val abortedTransactions = ListBuffer.empty[AbortedTransaction]
+ def accumulator(abortedTxns: List[AbortedTxn]): Unit = abortedTransactions ++= abortedTxns.map(_.asAbortedTransaction)
+ collectAbortedTransactions(startOffset, upperBoundOffset, segmentEntry, accumulator)
+
FetchDataInfo(fetchOffsetMetadata = fetchInfo.fetchOffsetMetadata,
records = fetchInfo.records,
firstEntryIncomplete = fetchInfo.firstEntryIncomplete,
- abortedTransactions = Some(abortedTransactions))
+ abortedTransactions = Some(abortedTransactions.toList))
}
private def collectAbortedTransactions(startOffset: Long, upperBoundOffset: Long,
- startingSegmentEntry: JEntry[JLong, LogSegment]): List[AbortedTransaction] = {
+ startingSegmentEntry: JEntry[JLong, LogSegment],
+ accumulator: List[AbortedTxn] => Unit): Unit = {
var segmentEntry = startingSegmentEntry
- val abortedTransactions = ListBuffer.empty[AbortedTransaction]
-
while (segmentEntry != null) {
val searchResult = segmentEntry.getValue.collectAbortedTxns(startOffset, upperBoundOffset)
- abortedTransactions ++= searchResult.abortedTransactions
+ accumulator(searchResult.abortedTransactions)
if (searchResult.isComplete)
- return abortedTransactions.toList
-
+ return
segmentEntry = segments.higherEntry(segmentEntry.getKey)
}
- abortedTransactions.toList
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/LogCleaner.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala b/core/src/main/scala/kafka/log/LogCleaner.scala
index 282e049..8eda2e1 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -26,16 +26,16 @@ import com.yammer.metrics.core.Gauge
import kafka.common._
import kafka.metrics.KafkaMetricsGroup
import kafka.utils._
-import org.apache.kafka.common.record.{FileRecords, MemoryRecords, Record, RecordBatch}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Time
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.record.MemoryRecords.RecordFilter
-import scala.collection._
-import JavaConverters._
+import scala.collection.mutable
+import scala.collection.JavaConverters._
/**
- * The cleaner is responsible for removing obsolete records from logs which have the dedupe retention strategy.
+ * The cleaner is responsible for removing obsolete records from logs which have the "compact" retention strategy.
* A message with key K and offset O is obsolete if there exists a message with key K and offset O' such that O < O'.
*
* Each log can be thought of being split into two sections of segments: a "clean" section which has previously been cleaned followed by a
@@ -43,7 +43,7 @@ import JavaConverters._
* The uncleanable section is excluded from cleaning. The active log segment is always uncleanable. If there is a
* compaction lag time set, segments whose largest message timestamp is within the compaction lag time of the cleaning operation are also uncleanable.
*
- * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "dedupe" retention policy
+ * The cleaning is carried out by a pool of background threads. Each thread chooses the dirtiest log that has the "compact" retention policy
* and cleans that. The dirtiness of the log is guessed by taking the ratio of bytes in the dirty section of the log to the total bytes in the log.
*
* To clean a log the cleaner first builds a mapping of key=>last_offset for the dirty section of the log. See kafka.log.OffsetMap for details of
@@ -332,10 +332,22 @@ private[log] class Cleaner(val id: Int,
* @return The first offset not cleaned and the statistics for this round of cleaning
*/
private[log] def clean(cleanable: LogToClean): (Long, CleanerStats) = {
- val stats = new CleanerStats()
+ // figure out the timestamp below which it is safe to remove delete tombstones
+ // this position is defined to be a configurable time beneath the last modified time of the last clean segment
+ val deleteHorizonMs =
+ cleanable.log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
+ case None => 0L
+ case Some(seg) => seg.lastModified - cleanable.log.config.deleteRetentionMs
+ }
+
+ doClean(cleanable, deleteHorizonMs)
+ }
+ private[log] def doClean(cleanable: LogToClean, deleteHorizonMs: Long): (Long, CleanerStats) = {
info("Beginning cleaning of log %s.".format(cleanable.log.name))
+
val log = cleanable.log
+ val stats = new CleanerStats()
// build the offset map
info("Building offset map for %s...".format(cleanable.log.name))
@@ -343,14 +355,6 @@ private[log] class Cleaner(val id: Int,
buildOffsetMap(log, cleanable.firstDirtyOffset, upperBoundOffset, offsetMap, stats)
val endOffset = offsetMap.latestOffset + 1
stats.indexDone()
-
- // figure out the timestamp below which it is safe to remove delete tombstones
- // this position is defined to be a configurable time beneath the last modified time of the last clean segment
- val deleteHorizonMs =
- log.logSegments(0, cleanable.firstDirtyOffset).lastOption match {
- case None => 0L
- case Some(seg) => seg.lastModified - log.config.deleteRetentionMs
- }
// determine the timestamp up to which the log will be cleaned
// this is the lower of the last active segment and the compaction lag
@@ -363,7 +367,7 @@ private[log] class Cleaner(val id: Int,
// record buffer utilization
stats.bufferUtilization = offsetMap.utilization
-
+
stats.allDone()
(endOffset, stats)
@@ -379,8 +383,8 @@ private[log] class Cleaner(val id: Int,
* @param stats Collector for cleaning statistics
*/
private[log] def cleanSegments(log: Log,
- segments: Seq[LogSegment],
- map: OffsetMap,
+ segments: Seq[LogSegment],
+ map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats) {
// create a new segment with the suffix .cleaned appended to both the log and index name
@@ -403,11 +407,24 @@ private[log] class Cleaner(val id: Int,
try {
// clean segments into the new destination segment
- for (old <- segments) {
- val retainDeletes = old.lastModified > deleteHorizonMs
+ val iter = segments.iterator
+ var currentSegmentOpt: Option[LogSegment] = Some(iter.next())
+ while (currentSegmentOpt.isDefined) {
+ val oldSegmentOpt = currentSegmentOpt.get
+ val nextSegmentOpt = if (iter.hasNext) Some(iter.next()) else None
+
+ val startOffset = oldSegmentOpt.baseOffset
+ val upperBoundOffset = nextSegmentOpt.map(_.baseOffset).getOrElse(map.latestOffset + 1)
+ val abortedTransactions = log.collectAbortedTransactions(startOffset, upperBoundOffset)
+ val transactionMetadata = CleanedTransactionMetadata(abortedTransactions, Some(txnIndex))
+
+ val retainDeletes = oldSegmentOpt.lastModified > deleteHorizonMs
info("Cleaning segment %s in log %s (largest timestamp %s) into %s, %s deletes."
- .format(old.baseOffset, log.name, new Date(old.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
- cleanInto(log.topicPartition, old, cleaned, map, retainDeletes, log.config.maxMessageSize, log.activePids, stats)
+ .format(startOffset, log.name, new Date(oldSegmentOpt.largestTimestamp), cleaned.baseOffset, if(retainDeletes) "retaining" else "discarding"))
+ cleanInto(log.topicPartition, oldSegmentOpt, cleaned, map, retainDeletes, log.config.maxMessageSize, transactionMetadata,
+ log.activePids, stats)
+
+ currentSegmentOpt = nextSegmentOpt
}
// trim excess index
@@ -454,11 +471,39 @@ private[log] class Cleaner(val id: Int,
map: OffsetMap,
retainDeletes: Boolean,
maxLogMessageSize: Int,
- activePids: Map[Long, ProducerIdEntry],
+ transactionMetadata: CleanedTransactionMetadata,
+ activeProducers: Map[Long, ProducerIdEntry],
stats: CleanerStats) {
val logCleanerFilter = new RecordFilter {
- def shouldRetain(recordBatch: RecordBatch, record: Record): Boolean =
- shouldRetainMessage(source, map, retainDeletes, recordBatch, record, stats, activePids)
+ var retainLastBatchSequence: Boolean = false
+ var discardBatchRecords: Boolean = false
+
+ override def shouldDiscard(batch: RecordBatch): Boolean = {
+ // we piggy-back on the tombstone retention logic to delay deletion of transaction markers.
+ // note that we will never delete a marker until all the records from that transaction are removed.
+ discardBatchRecords = shouldDiscardBatch(batch, transactionMetadata, retainTxnMarkers = retainDeletes)
+
+ // check if the batch contains the last sequence number for the producer. if so, we cannot
+ // remove the batch just yet or the producer may see an out of sequence error.
+ if (batch.hasProducerId && activeProducers.get(batch.producerId).exists(_.lastSeq == batch.lastSequence)) {
+ retainLastBatchSequence = true
+ false
+ } else {
+ retainLastBatchSequence = false
+ discardBatchRecords
+ }
+ }
+
+ override def shouldRetain(batch: RecordBatch, record: Record): Boolean = {
+ if (retainLastBatchSequence && batch.lastSequence == record.sequence)
+ // always retain the record with the last sequence number
+ true
+ else if (discardBatchRecords)
+ // remove the record if the batch would have otherwise been discarded
+ false
+ else
+ shouldRetainRecord(source, map, retainDeletes, batch, record, stats)
+ }
}
var position = 0
@@ -488,7 +533,7 @@ private[log] class Cleaner(val id: Int,
records = retained)
throttler.maybeThrottle(writeBuffer.limit)
}
-
+
// if we read bytes but didn't get even one complete message, our I/O buffer is too small, grow it and try again
if (readBuffer.limit > 0 && result.messagesRead == 0)
growBuffers(maxLogMessageSize)
@@ -496,24 +541,24 @@ private[log] class Cleaner(val id: Int,
restoreBuffers()
}
- private def shouldRetainMessage(source: kafka.log.LogSegment,
- map: kafka.log.OffsetMap,
- retainDeletes: Boolean,
- batch: RecordBatch,
- record: Record,
- stats: CleanerStats,
- activeProducers: Map[Long, ProducerIdEntry]): Boolean = {
- if (batch.isControlBatch)
- return true
-
- // retain the record if it is the last one produced by an active idempotent producer to ensure that
- // the producerId is not removed from the log before it has been expired
- if (batch.hasProducerId) {
- val producerId = batch.producerId
- if (RecordBatch.NO_PRODUCER_ID < producerId && activeProducers.get(producerId).exists(_.lastOffset == record.offset))
- return true
+ private def shouldDiscardBatch(batch: RecordBatch,
+ transactionMetadata: CleanedTransactionMetadata,
+ retainTxnMarkers: Boolean): Boolean = {
+ if (batch.isControlBatch) {
+ val canDiscardControlBatch = transactionMetadata.onControlBatchRead(batch)
+ canDiscardControlBatch && !retainTxnMarkers
+ } else {
+ val canDiscardBatch = transactionMetadata.onBatchRead(batch)
+ canDiscardBatch
}
+ }
+ private def shouldRetainRecord(source: kafka.log.LogSegment,
+ map: kafka.log.OffsetMap,
+ retainDeletes: Boolean,
+ batch: RecordBatch,
+ record: Record,
+ stats: CleanerStats): Boolean = {
val pastLatestOffset = record.offset > map.latestOffset
if (pastLatestOffset)
return true
@@ -546,7 +591,7 @@ private[log] class Cleaner(val id: Int,
this.readBuffer = ByteBuffer.allocate(newSize)
this.writeBuffer = ByteBuffer.allocate(newSize)
}
-
+
/**
* Restore the I/O buffer capacity to its original size
*/
@@ -609,14 +654,18 @@ private[log] class Cleaner(val id: Int,
map.clear()
val dirty = log.logSegments(start, end).toBuffer
info("Building offset map for log %s for %d segments in offset range [%d, %d).".format(log.name, dirty.size, start, end))
-
+
+ val abortedTransactions = log.collectAbortedTransactions(start, end)
+ val transactionMetadata = CleanedTransactionMetadata(abortedTransactions)
+
// Add all the cleanable dirty segments. We must take at least map.slots * load_factor,
// but we may be able to fit more (if there is lots of duplication in the dirty section of the log)
var full = false
for (segment <- dirty if !full) {
checkDone(log.topicPartition)
- full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize, stats)
+ full = buildOffsetMapForSegment(log.topicPartition, segment, map, start, log.config.maxMessageSize,
+ transactionMetadata, stats)
if (full)
debug("Offset map is full, %d segments fully mapped, segment with base offset %d is partially mapped".format(dirty.indexOf(segment), segment.baseOffset))
}
@@ -635,10 +684,11 @@ private[log] class Cleaner(val id: Int,
private def buildOffsetMapForSegment(topicPartition: TopicPartition,
segment: LogSegment,
map: OffsetMap,
- start: Long,
+ startOffset: Long,
maxLogMessageSize: Int,
+ transactionMetadata: CleanedTransactionMetadata,
stats: CleanerStats): Boolean = {
- var position = segment.index.lookup(start).position
+ var position = segment.index.lookup(startOffset).position
val maxDesiredMapSize = (map.slots * this.dupBufferLoadFactor).toInt
while (position < segment.log.sizeInBytes) {
checkDone(topicPartition)
@@ -648,14 +698,30 @@ private[log] class Cleaner(val id: Int,
throttler.maybeThrottle(records.sizeInBytes)
val startPosition = position
- for (batch <- records.batches.asScala; record <- batch.asScala) {
- if (!batch.isControlBatch && record.hasKey && record.offset >= start) {
- if (map.size < maxDesiredMapSize)
- map.put(record.key, record.offset)
- else
- return true
+ for (batch <- records.batches.asScala) {
+ if (batch.isControlBatch) {
+ transactionMetadata.onControlBatchRead(batch)
+ stats.indexMessagesRead(1)
+ } else {
+ val isAborted = transactionMetadata.onBatchRead(batch)
+ if (isAborted) {
+ // abort markers are supported in v2 and above, which means count is defined
+ stats.indexMessagesRead(batch.countOrNull)
+ } else {
+ for (record <- batch.asScala) {
+ if (record.hasKey && record.offset >= startOffset) {
+ if (map.size < maxDesiredMapSize)
+ map.put(record.key, record.offset)
+ else
+ return true
+ }
+ stats.indexMessagesRead(1)
+ }
+ }
}
- stats.indexMessagesRead(1)
+
+ if (batch.lastOffset >= startOffset)
+ map.updateLatestOffset(batch.lastOffset)
}
val bytesRead = records.validBytes
position += bytesRead
@@ -694,7 +760,7 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
def invalidMessage() {
invalidMessagesRead += 1
}
-
+
def recopyMessages(messagesWritten: Int, bytesWritten: Int) {
this.messagesWritten += messagesWritten
this.bytesWritten += bytesWritten
@@ -715,11 +781,11 @@ private class CleanerStats(time: Time = Time.SYSTEM) {
def allDone() {
endTime = time.milliseconds
}
-
+
def elapsedSecs = (endTime - startTime)/1000.0
-
+
def elapsedIndexSecs = (mapCompleteTime - startTime)/1000.0
-
+
}
/**
@@ -734,3 +800,80 @@ private case class LogToClean(topicPartition: TopicPartition, log: Log, firstDir
val cleanableRatio = cleanableBytes / totalBytes.toDouble
override def compare(that: LogToClean): Int = math.signum(this.cleanableRatio - that.cleanableRatio).toInt
}
+
+private[log] object CleanedTransactionMetadata {
+ def apply(abortedTransactions: List[AbortedTxn],
+ transactionIndex: Option[TransactionIndex] = None): CleanedTransactionMetadata = {
+ val queue = mutable.PriorityQueue.empty[AbortedTxn](new Ordering[AbortedTxn] {
+ override def compare(x: AbortedTxn, y: AbortedTxn): Int = x.firstOffset compare y.firstOffset
+ }.reverse)
+ queue ++= abortedTransactions
+ new CleanedTransactionMetadata(queue, transactionIndex)
+ }
+
+ val Empty = CleanedTransactionMetadata(List.empty[AbortedTxn])
+}
+
+/**
+ * This is a helper class to facilitate tracking transaction state while cleaning the log. It is initialized
+ * with the aborted transactions from the transaction index and its state is updated as the cleaner iterates through
+ * the log during a round of cleaning. This class is responsible for deciding when transaction markers can
+ * be removed and is therefore also responsible for updating the cleaned transaction index accordingly.
+ */
+private[log] class CleanedTransactionMetadata(val abortedTransactions: mutable.PriorityQueue[AbortedTxn],
+ val transactionIndex: Option[TransactionIndex] = None) {
+ val ongoingCommittedTxns = mutable.Set.empty[Long]
+ val ongoingAbortedTxns = mutable.Map.empty[Long, AbortedTxn]
+
+ /**
+ * Update the cleaned transaction state with a control batch that has just been traversed by the cleaner.
+ * Return true if the control batch can be discarded.
+ */
+ def onControlBatchRead(controlBatch: RecordBatch): Boolean = {
+ consumeAbortedTxnsUpTo(controlBatch.lastOffset)
+
+ val controlRecord = controlBatch.iterator.next()
+ val controlType = ControlRecordType.parse(controlRecord.key)
+ val producerId = controlBatch.producerId
+ controlType match {
+ case ControlRecordType.ABORT =>
+ val maybeAbortedTxn = ongoingAbortedTxns.remove(producerId)
+ maybeAbortedTxn.foreach { abortedTxn =>
+ transactionIndex.foreach(_.append(abortedTxn))
+ }
+ true
+
+ case ControlRecordType.COMMIT =>
+ // this marker is eligible for deletion if we didn't traverse any records from the transaction
+ !ongoingCommittedTxns.remove(producerId)
+
+ case _ => false
+ }
+ }
+
+ private def consumeAbortedTxnsUpTo(offset: Long): Unit = {
+ while (abortedTransactions.headOption.exists(_.firstOffset <= offset)) {
+ val abortedTxn = abortedTransactions.dequeue()
+ ongoingAbortedTxns += abortedTxn.producerId -> abortedTxn
+ }
+ }
+
+ /**
+ * Update the transactional state for the incoming non-control batch. If the batch is part of
+ * an aborted transaction, return true to indicate that it is safe to discard.
+ */
+ def onBatchRead(batch: RecordBatch): Boolean = {
+ consumeAbortedTxnsUpTo(batch.lastOffset)
+ if (batch.isTransactional) {
+ if (ongoingAbortedTxns.contains(batch.producerId))
+ true
+ else {
+ ongoingCommittedTxns += batch.producerId
+ false
+ }
+ } else {
+ false
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/LogCleanerManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 2b4d956..6e0ebfb 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -304,19 +304,22 @@ private[log] object LogCleanerManager extends Logging {
// may be cleaned
val firstUncleanableDirtyOffset: Long = Seq (
- // the active segment is always uncleanable
- Option(log.activeSegment.baseOffset),
-
- // the first segment whose largest message timestamp is within a minimum time lag from now
- if (compactionLagMs > 0) {
- dirtyNonActiveSegments.find {
- s =>
- val isUncleanable = s.largestTimestamp > now - compactionLagMs
- debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
- isUncleanable
- } map(_.baseOffset)
- } else None
- ).flatten.min
+ // we do not clean beyond the first unstable offset
+ log.firstUnstableOffset.map(_.messageOffset),
+
+ // the active segment is always uncleanable
+ Option(log.activeSegment.baseOffset),
+
+ // the first segment whose largest message timestamp is within a minimum time lag from now
+ if (compactionLagMs > 0) {
+ dirtyNonActiveSegments.find {
+ s =>
+ val isUncleanable = s.largestTimestamp > now - compactionLagMs
+ debug(s"Checking if log segment may be cleaned: log='${log.name}' segment.baseOffset=${s.baseOffset} segment.largestTimestamp=${s.largestTimestamp}; now - compactionLag=${now - compactionLagMs}; is uncleanable=$isUncleanable")
+ isUncleanable
+ } map(_.baseOffset)
+ } else None
+ ).flatten.min
debug(s"Finding range of cleanable offsets for log=${log.name} topicPartition=$topicPartition. Last clean offset=$lastCleanOffset now=$now => firstDirtyOffset=$firstDirtyOffset firstUncleanableOffset=$firstUncleanableDirtyOffset activeSegment.baseOffset=${log.activeSegment.baseOffset}")
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/OffsetMap.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/OffsetMap.scala b/core/src/main/scala/kafka/log/OffsetMap.scala
index 1df0615..8b493c2 100755
--- a/core/src/main/scala/kafka/log/OffsetMap.scala
+++ b/core/src/main/scala/kafka/log/OffsetMap.scala
@@ -27,6 +27,7 @@ trait OffsetMap {
def slots: Int
def put(key: ByteBuffer, offset: Long)
def get(key: ByteBuffer): Long
+ def updateLatestOffset(offset: Long)
def clear()
def size: Int
def utilization: Double = size.toDouble / slots
@@ -167,6 +168,10 @@ class SkimpyOffsetMap(val memory: Int, val hashAlgorithm: String = "MD5") extend
*/
override def latestOffset: Long = lastOffset
+ override def updateLatestOffset(offset: Long): Unit = {
+ lastOffset = offset
+ }
+
/**
* Calculate the ith probe position. We first try reading successive integers from the hash itself
* then if all of those fail we degrade to linear probing.
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/ProducerStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/ProducerStateManager.scala b/core/src/main/scala/kafka/log/ProducerStateManager.scala
index b1a43d2..03c60e4 100644
--- a/core/src/main/scala/kafka/log/ProducerStateManager.scala
+++ b/core/src/main/scala/kafka/log/ProducerStateManager.scala
@@ -165,7 +165,7 @@ private[log] class ProducerAppendInfo(val producerId: Long, initialEntry: Produc
}
val firstOffset = currentTxnFirstOffset match {
- case Some(firstOffset) => firstOffset
+ case Some(txnFirstOffset) => txnFirstOffset
case None =>
transactions += new TxnMetadata(producerId, offset)
offset
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/main/scala/kafka/log/TransactionIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/TransactionIndex.scala b/core/src/main/scala/kafka/log/TransactionIndex.scala
index bf6a6d4..f8a3879 100644
--- a/core/src/main/scala/kafka/log/TransactionIndex.scala
+++ b/core/src/main/scala/kafka/log/TransactionIndex.scala
@@ -28,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
import scala.collection.mutable.ListBuffer
-private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTransaction], isComplete: Boolean)
+private[log] case class TxnIndexSearchResult(abortedTransactions: List[AbortedTxn], isComplete: Boolean)
/**
* The transaction index maintains metadata about the aborted transactions for each segment. This includes
@@ -114,7 +114,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
}
}
- private def iterator(allocate: () => ByteBuffer): Iterator[(AbortedTxn, Int)] = {
+ private def iterator(allocate: () => ByteBuffer = () => ByteBuffer.allocate(AbortedTxn.TotalSize)): Iterator[(AbortedTxn, Int)] = {
maybeChannel match {
case None => Iterator.empty
case Some(channel) =>
@@ -148,7 +148,7 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
}
def allAbortedTxns: List[AbortedTxn] = {
- iterator(() => ByteBuffer.allocate(AbortedTxn.TotalSize)).map(_._1).toList
+ iterator().map(_._1).toList
}
/**
@@ -160,11 +160,10 @@ class TransactionIndex(val startOffset: Long, @volatile var file: File) extends
* into the next log segment.
*/
def collectAbortedTxns(fetchOffset: Long, upperBoundOffset: Long): TxnIndexSearchResult = {
- val abortedTransactions = ListBuffer.empty[AbortedTransaction]
- val buffer = ByteBuffer.allocate(AbortedTxn.TotalSize)
- for ((abortedTxn, _) <- iterator(() => buffer)) {
+ val abortedTransactions = ListBuffer.empty[AbortedTxn]
+ for ((abortedTxn, _) <- iterator()) {
if (abortedTxn.lastOffset >= fetchOffset && abortedTxn.firstOffset < upperBoundOffset)
- abortedTransactions += abortedTxn.asAbortedTransaction
+ abortedTransactions += abortedTxn
if (abortedTxn.lastStableOffset >= upperBoundOffset)
return TxnIndexSearchResult(abortedTransactions.toList, isComplete = true)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
index c2ae155..9f9d982 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerManagerTest.scala
@@ -166,6 +166,54 @@ class LogCleanerManagerTest extends JUnitSuite with Logging {
assertEquals("The first uncleanable offset begins with active segment.", log.activeSegment.baseOffset, cleanableOffsets._2)
}
+ @Test
+ def testUndecidedTransactionalDataNotCleanable(): Unit = {
+ val topicPartition = new TopicPartition("log", 0)
+ val compactionLag = 60 * 60 * 1000
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ logProps.put(LogConfig.MinCompactionLagMsProp, compactionLag: java.lang.Integer)
+
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerId = 15L
+ val producerEpoch = 0.toShort
+ val sequence = 0
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence,
+ new SimpleRecord(time.milliseconds(), "1".getBytes, "a".getBytes),
+ new SimpleRecord(time.milliseconds(), "2".getBytes, "b".getBytes)), leaderEpoch = 0)
+ log.appendAsLeader(MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence + 2,
+ new SimpleRecord(time.milliseconds(), "3".getBytes, "c".getBytes)), leaderEpoch = 0)
+ log.roll()
+ log.onHighWatermarkIncremented(3L)
+
+ time.sleep(compactionLag + 1)
+ // although the compaction lag has been exceeded, the undecided data should not be cleaned
+ var cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
+ Map(topicPartition -> 0L), time.milliseconds())
+ assertEquals(0L, cleanableOffsets._1)
+ assertEquals(0L, cleanableOffsets._2)
+
+ log.appendAsLeader(MemoryRecords.withEndTransactionMarker(time.milliseconds(), producerId, producerEpoch,
+ new EndTransactionMarker(ControlRecordType.ABORT, 15)), leaderEpoch = 0, isFromClient = false)
+ log.roll()
+ log.onHighWatermarkIncremented(4L)
+
+ // the first segment should now become cleanable immediately
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
+ Map(topicPartition -> 0L), time.milliseconds())
+ assertEquals(0L, cleanableOffsets._1)
+ assertEquals(3L, cleanableOffsets._2)
+
+ time.sleep(compactionLag + 1)
+
+ // the second segment becomes cleanable after the compaction lag
+ cleanableOffsets = LogCleanerManager.cleanableOffsets(log, topicPartition,
+ Map(topicPartition -> 0L), time.milliseconds())
+ assertEquals(0L, cleanableOffsets._1)
+ assertEquals(4L, cleanableOffsets._2)
+ }
+
private def createCleanerManager(log: Log): LogCleanerManager = {
val logs = new Pool[TopicPartition, Log]()
logs.put(new TopicPartition("log", 0), log)
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index fe07fdd..c842fc3 100755
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -87,6 +87,159 @@ class LogCleanerTest extends JUnitSuite {
assertEquals(expectedBytesRead, stats.bytesRead)
}
+ @Test
+ def testBasicTransactionAwareCleaning(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 2048: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val pid1 = 1
+ val pid2 = 2
+
+ val appendProducer1 = appendTransactionalAsLeader(log, pid1, producerEpoch)
+ val appendProducer2 = appendTransactionalAsLeader(log, pid2, producerEpoch)
+
+ appendProducer1(Seq(1, 2))
+ appendProducer2(Seq(2, 3))
+ appendProducer1(Seq(3, 4))
+ log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ log.appendAsLeader(commitMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ appendProducer1(Seq(2))
+ log.appendAsLeader(commitMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+
+ val abortedTransactions = log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset)
+
+ log.roll()
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+ assertEquals(List(3, 2), keysInLog(log))
+ assertEquals(List(3, 6, 7, 8, 9), offsetsInLog(log))
+
+ // ensure the transaction index is still correct
+ assertEquals(abortedTransactions, log.collectAbortedTransactions(log.logStartOffset, log.logEndOffset))
+ }
+
+ @Test
+ def testCleanWithTransactionsSpanningSegments(): Unit = {
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val pid1 = 1
+ val pid2 = 2
+ val pid3 = 3
+
+ val appendProducer1 = appendTransactionalAsLeader(log, pid1, producerEpoch)
+ val appendProducer2 = appendTransactionalAsLeader(log, pid2, producerEpoch)
+ val appendProducer3 = appendTransactionalAsLeader(log, pid3, producerEpoch)
+
+ appendProducer1(Seq(1, 2))
+ appendProducer3(Seq(2, 3))
+ appendProducer2(Seq(3, 4))
+
+ log.roll()
+
+ appendProducer2(Seq(5, 6))
+ appendProducer3(Seq(6, 7))
+ appendProducer1(Seq(7, 8))
+ log.appendAsLeader(abortMarker(pid2, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ appendProducer3(Seq(8, 9))
+ log.appendAsLeader(commitMarker(pid3, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ appendProducer1(Seq(9, 10))
+ log.appendAsLeader(abortMarker(pid1, producerEpoch), leaderEpoch = 0, isFromClient = false)
+
+ // we have only cleaned the records in the first segment
+ val dirtyOffset = cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))._1
+ assertEquals(List(2, 3, 5, 6, 6, 7, 7, 8, 8, 9, 9, 10), keysInLog(log))
+
+ log.roll()
+
+ // append a couple extra segments in the new segment to ensure we have sequence numbers
+ appendProducer2(Seq(11))
+ appendProducer1(Seq(12))
+
+ // finally only the keys from pid3 should remain
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, dirtyOffset, log.activeSegment.baseOffset))
+ assertEquals(List(2, 3, 6, 7, 8, 9, 11, 12), keysInLog(log))
+ }
+
+ @Test
+ def testCommitMarkerRemoval(): Unit = {
+ val tp = new TopicPartition("test", 0)
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val producerId = 1L
+ val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+ appendProducer(Seq(1))
+ appendProducer(Seq(2, 3))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ appendProducer(Seq(2))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ log.roll()
+
+ // cannot remove the marker in this pass because there are still valid records
+ var dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = time.milliseconds())._1
+ assertEquals(List(1, 3, 2), keysInLog(log))
+ assertEquals(List(0, 2, 3, 4, 5), offsetsInLog(log))
+
+ appendProducer(Seq(1, 3))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ log.roll()
+
+ // the first cleaning preserves the commit marker (at offset 3) since there were still records for the transaction
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = time.milliseconds())._1
+ assertEquals(List(2, 1, 3), keysInLog(log))
+ assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
+
+ // delete horizon forced to 0 to verify marker is not removed early
+ dirtyOffset = cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = 0L)._1
+ assertEquals(List(2, 1, 3), keysInLog(log))
+ assertEquals(List(3, 4, 5, 6, 7, 8), offsetsInLog(log))
+
+ // clean again with the delete horizon set back to the current time and verify the marker is removed
+ cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = time.milliseconds())
+ assertEquals(List(2, 1, 3), keysInLog(log))
+ assertEquals(List(4, 5, 6, 7, 8), offsetsInLog(log))
+ }
+
+ @Test
+ def testAbortMarkerRemoval(): Unit = {
+ val tp = new TopicPartition("test", 0)
+ val cleaner = makeCleaner(Int.MaxValue)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 256: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val producerId = 1L
+ val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+ appendProducer(Seq(1))
+ appendProducer(Seq(2, 3))
+ log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ appendProducer(Seq(3))
+ log.appendAsLeader(commitMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ log.roll()
+
+ // delete horizon set to 0 to verify marker is not removed early
+ val dirtyOffset = cleaner.doClean(LogToClean(tp, log, 0L, 100L), deleteHorizonMs = 0L)._1
+ assertEquals(List(3), keysInLog(log))
+ assertEquals(List(3, 4, 5), offsetsInLog(log))
+
+ // clean again with the delete horizon set back to the current time and verify the marker is removed
+ cleaner.doClean(LogToClean(tp, log, dirtyOffset, 100L), deleteHorizonMs = time.milliseconds())
+ assertEquals(List(3), keysInLog(log))
+ assertEquals(List(4, 5), offsetsInLog(log))
+ }
+
/**
* Test log cleaning with logs containing messages larger than default message size
*/
@@ -174,24 +327,45 @@ class LogCleanerTest extends JUnitSuite {
}
@Test
- def testLogCleanerRetainsLastWrittenRecordForEachPid(): Unit = {
+ def testLogCleanerRetainsProducerLastSequence(): Unit = {
val cleaner = makeCleaner(10)
val logProps = new Properties()
logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
log.appendAsLeader(record(0, 0), leaderEpoch = 0) // offset 0
- log.appendAsLeader(record(0, 1, pid = 1, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 1
- log.appendAsLeader(record(0, 2, pid = 2, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 2
- log.appendAsLeader(record(0, 3, pid = 3, epoch = 0, sequence = 0), leaderEpoch = 0) // offset 3
- log.appendAsLeader(record(1, 1, pid = 2, epoch = 0, sequence = 1), leaderEpoch = 0) // offset 4
+ log.appendAsLeader(record(0, 1, producerId = 1, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 1
+ log.appendAsLeader(record(0, 2, producerId = 2, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 2
+ log.appendAsLeader(record(0, 3, producerId = 3, producerEpoch = 0, sequence = 0), leaderEpoch = 0) // offset 3
+ log.appendAsLeader(record(1, 1, producerId = 2, producerEpoch = 0, sequence = 1), leaderEpoch = 0) // offset 4
// roll the segment, so we can clean the messages already appended
log.roll()
- cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
- assertEquals(immutable.List(0, 0, 1), keysInLog(log))
- assertEquals(immutable.List(1, 3, 4), offsetsInLog(log))
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+ assertEquals(List(0, 0, 1), keysInLog(log))
+ assertEquals(List(1, 3, 4), offsetsInLog(log))
+ }
+
+ @Test
+ def testLogCleanerRetainsLastSequenceEvenIfTransactionAborted(): Unit = {
+ val cleaner = makeCleaner(10)
+ val logProps = new Properties()
+ logProps.put(LogConfig.SegmentBytesProp, 1024: java.lang.Integer)
+ val log = makeLog(config = LogConfig.fromProps(logConfig.originals, logProps))
+
+ val producerEpoch = 0.toShort
+ val producerId = 1L
+ val appendProducer = appendTransactionalAsLeader(log, producerId, producerEpoch)
+
+ appendProducer(Seq(1))
+ appendProducer(Seq(2, 3))
+ log.appendAsLeader(abortMarker(producerId, producerEpoch), leaderEpoch = 0, isFromClient = false)
+ log.roll()
+
+ cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 0L, log.activeSegment.baseOffset))
+ assertEquals(List(3), keysInLog(log))
+ assertEquals(List(2, 3), offsetsInLog(log))
}
@Test
@@ -213,17 +387,17 @@ class LogCleanerTest extends JUnitSuite {
// clean the log with only one message removed
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 2, log.activeSegment.baseOffset))
- assertEquals(immutable.List(1,0,1,0), keysInLog(log))
- assertEquals(immutable.List(1,2,3,4), offsetsInLog(log))
+ assertEquals(List(1,0,1,0), keysInLog(log))
+ assertEquals(List(1,2,3,4), offsetsInLog(log))
// continue to make progress, even though we can only clean one message at a time
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 3, log.activeSegment.baseOffset))
- assertEquals(immutable.List(0,1,0), keysInLog(log))
- assertEquals(immutable.List(2,3,4), offsetsInLog(log))
+ assertEquals(List(0,1,0), keysInLog(log))
+ assertEquals(List(2,3,4), offsetsInLog(log))
cleaner.clean(LogToClean(new TopicPartition("test", 0), log, 4, log.activeSegment.baseOffset))
- assertEquals(immutable.List(1,0), keysInLog(log))
- assertEquals(immutable.List(3,4), offsetsInLog(log))
+ assertEquals(List(1,0), keysInLog(log))
+ assertEquals(List(3,4), offsetsInLog(log))
}
@Test
@@ -346,8 +520,12 @@ class LogCleanerTest extends JUnitSuite {
}
/* extract all the keys from a log */
- def keysInLog(log: Log): Iterable[Int] =
- log.logSegments.flatMap(s => s.log.records.asScala.filter(_.hasValue).filter(_.hasKey).map(record => TestUtils.readString(record.key).toInt))
+ def keysInLog(log: Log): Iterable[Int] = {
+ for (segment <- log.logSegments;
+ batch <- segment.log.batches.asScala if !batch.isControlBatch;
+ record <- batch.asScala if record.hasValue && record.hasKey)
+ yield TestUtils.readString(record.key).toInt
+ }
/* extract all the offsets from a log */
def offsetsInLog(log: Log): Iterable[Long] =
@@ -795,12 +973,12 @@ class LogCleanerTest extends JUnitSuite {
private def messageWithOffset(key: Int, value: Int, offset: Long): MemoryRecords =
messageWithOffset(key.toString.getBytes, value.toString.getBytes, offset)
- def makeLog(dir: File = dir, config: LogConfig = logConfig) =
+ private def makeLog(dir: File = dir, config: LogConfig = logConfig) =
new Log(dir = dir, config = config, logStartOffset = 0L, recoveryPoint = 0L, scheduler = time.scheduler, time = time)
- def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }
+ private def noOpCheckDone(topicPartition: TopicPartition) { /* do nothing */ }
- def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) =
+ private def makeCleaner(capacity: Int, checkDone: TopicPartition => Unit = noOpCheckDone, maxMessageSize: Int = 64*1024) =
new Cleaner(id = 0,
offsetMap = new FakeOffsetMap(capacity),
ioBufferSize = maxMessageSize,
@@ -810,28 +988,62 @@ class LogCleanerTest extends JUnitSuite {
time = time,
checkDone = checkDone)
- def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
+ private def writeToLog(log: Log, seq: Iterable[(Int, Int)]): Iterable[Long] = {
for((key, value) <- seq)
yield log.appendAsLeader(record(key, value), leaderEpoch = 0).firstOffset
}
- def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
-
+ private def key(id: Int) = ByteBuffer.wrap(id.toString.getBytes)
- def record(key: Int, value: Int, pid: Long = RecordBatch.NO_PRODUCER_ID, epoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
+ private def record(key: Int, value: Int,
+ producerId: Long = RecordBatch.NO_PRODUCER_ID,
+ producerEpoch: Short = RecordBatch.NO_PRODUCER_EPOCH,
sequence: Int = RecordBatch.NO_SEQUENCE,
partitionLeaderEpoch: Int = RecordBatch.NO_PARTITION_LEADER_EPOCH): MemoryRecords = {
- MemoryRecords.withIdempotentRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, CompressionType.NONE, pid, epoch, sequence,
+ MemoryRecords.withIdempotentRecords(RecordBatch.CURRENT_MAGIC_VALUE, 0L, CompressionType.NONE, producerId, producerEpoch, sequence,
partitionLeaderEpoch, new SimpleRecord(key.toString.getBytes, value.toString.getBytes))
}
- def record(key: Int, value: Array[Byte]): MemoryRecords =
+ private def transactionalRecords(records: Seq[SimpleRecord],
+ producerId: Long,
+ producerEpoch: Short,
+ sequence: Int): MemoryRecords = {
+ MemoryRecords.withTransactionalRecords(CompressionType.NONE, producerId, producerEpoch, sequence, records: _*)
+ }
+
+ private def appendTransactionalAsLeader(log: Log, producerId: Long, producerEpoch: Short = 0): Seq[Int] => Unit = {
+ var sequence = 0
+ keys: Seq[Int] => {
+ val simpleRecords = keys.map { key =>
+ val keyBytes = key.toString.getBytes
+ new SimpleRecord(keyBytes, keyBytes) // the value doesn't matter too much since we validate offsets
+ }
+ val records = transactionalRecords(simpleRecords, producerId, producerEpoch, sequence)
+ log.appendAsLeader(records, leaderEpoch = 0)
+ sequence += simpleRecords.size
+ }
+ }
+
+ private def commitMarker(producerId: Long, producerEpoch: Short, timestamp: Long = time.milliseconds()): MemoryRecords =
+ endTxnMarker(producerId, producerEpoch, ControlRecordType.COMMIT, 0L, timestamp)
+
+ private def abortMarker(producerId: Long, producerEpoch: Short, timestamp: Long = time.milliseconds()): MemoryRecords =
+ endTxnMarker(producerId, producerEpoch, ControlRecordType.ABORT, 0L, timestamp)
+
+ private def endTxnMarker(producerId: Long, producerEpoch: Short, controlRecordType: ControlRecordType,
+ offset: Long, timestamp: Long): MemoryRecords = {
+ val endTxnMarker = new EndTransactionMarker(controlRecordType, 0)
+ MemoryRecords.withEndTransactionMarker(offset, timestamp, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ producerId, producerEpoch, endTxnMarker)
+ }
+
+ private def record(key: Int, value: Array[Byte]): MemoryRecords =
TestUtils.singletonRecords(key = key.toString.getBytes, value = value)
- def unkeyedRecord(value: Int): MemoryRecords =
+ private def unkeyedRecord(value: Int): MemoryRecords =
TestUtils.singletonRecords(value = value.toString.getBytes)
- def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
+ private def tombstoneRecord(key: Int): MemoryRecords = record(key, null)
}
@@ -842,12 +1054,12 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
private def keyFor(key: ByteBuffer) =
new String(Utils.readBytes(key.duplicate), "UTF-8")
- def put(key: ByteBuffer, offset: Long): Unit = {
+ override def put(key: ByteBuffer, offset: Long): Unit = {
lastOffset = offset
map.put(keyFor(key), offset)
}
- def get(key: ByteBuffer): Long = {
+ override def get(key: ByteBuffer): Long = {
val k = keyFor(key)
if(map.containsKey(k))
map.get(k)
@@ -855,11 +1067,15 @@ class FakeOffsetMap(val slots: Int) extends OffsetMap {
-1L
}
- def clear(): Unit = map.clear()
+ override def clear(): Unit = map.clear()
- def size: Int = map.size
+ override def size: Int = map.size
+
+ override def latestOffset: Long = lastOffset
- def latestOffset: Long = lastOffset
+ override def updateLatestOffset(offset: Long): Unit = {
+ lastOffset = offset
+ }
override def toString: String = map.toString
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/7baa58d7/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
index 4709b77..c3da9b3 100644
--- a/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogSegmentTest.scala
@@ -21,8 +21,7 @@ import java.io.File
import kafka.utils.TestUtils
import kafka.utils.TestUtils.checkEquals
import org.apache.kafka.common.TopicPartition
-import org.apache.kafka.common.record.MemoryRecords.withEndTransactionMarker
-import org.apache.kafka.common.record.{RecordBatch, _}
+import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.{Time, Utils}
import org.junit.Assert._
import org.junit.{After, Before, Test}
@@ -273,7 +272,8 @@ class LogSegmentTest {
@Test
def testRecoverTransactionIndex(): Unit = {
val segment = createSegment(100)
- val epoch = 0.toShort
+ val producerEpoch = 0.toShort
+ val partitionLeaderEpoch = 15
val sequence = 0
val pid1 = 5L
@@ -282,25 +282,25 @@ class LogSegmentTest {
// append transactional records from pid1
segment.append(firstOffset = 100L, largestOffset = 101L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 100L, MemoryRecords.withTransactionalRecords(100L, CompressionType.NONE,
- pid1, epoch, sequence, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+ pid1, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append transactional records from pid2
segment.append(firstOffset = 102L, largestOffset = 103L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 102L, MemoryRecords.withTransactionalRecords(102L, CompressionType.NONE,
- pid2, epoch, sequence, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+ pid2, producerEpoch, sequence, partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// append non-transactional records
segment.append(firstOffset = 104L, largestOffset = 105L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
shallowOffsetOfMaxTimestamp = 104L, MemoryRecords.withRecords(104L, CompressionType.NONE,
- new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
+ partitionLeaderEpoch, new SimpleRecord("a".getBytes), new SimpleRecord("b".getBytes)))
// abort the transaction from pid2 (note LSO should be 100L since the txn from pid1 has not completed)
segment.append(firstOffset = 106L, largestOffset = 106L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, epoch, offset = 106L))
+ shallowOffsetOfMaxTimestamp = 106L, endTxnRecords(ControlRecordType.ABORT, pid2, producerEpoch, offset = 106L))
// commit the transaction from pid1
segment.append(firstOffset = 107L, largestOffset = 107L, largestTimestamp = RecordBatch.NO_TIMESTAMP,
- shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, epoch, offset = 107L))
+ shallowOffsetOfMaxTimestamp = 107L, endTxnRecords(ControlRecordType.COMMIT, pid1, producerEpoch, offset = 107L))
segment.recover(64 * 1024, new ProducerStateManager(topicPartition, logDir))
@@ -314,7 +314,7 @@ class LogSegmentTest {
// recover again, but this time assuming the transaction from pid2 began on a previous segment
val stateManager = new ProducerStateManager(topicPartition, logDir)
- stateManager.loadProducerEntry(ProducerIdEntry(pid2, epoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
+ stateManager.loadProducerEntry(ProducerIdEntry(pid2, producerEpoch, 10, 90L, 5, RecordBatch.NO_TIMESTAMP, 0, Some(75L)))
segment.recover(64 * 1024, stateManager)
abortedTxns = segment.txnIndex.allAbortedTxns
@@ -328,11 +328,13 @@ class LogSegmentTest {
private def endTxnRecords(controlRecordType: ControlRecordType,
producerId: Long,
- epoch: Short,
+ producerEpoch: Short,
offset: Long = 0L,
- coordinatorEpoch: Int = 0): MemoryRecords = {
+ partitionLeaderEpoch: Int = 0,
+ coordinatorEpoch: Int = 0,
+ timestamp: Long = RecordBatch.NO_TIMESTAMP): MemoryRecords = {
val marker = new EndTransactionMarker(controlRecordType, coordinatorEpoch)
- withEndTransactionMarker(offset, producerId, epoch, marker)
+ MemoryRecords.withEndTransactionMarker(offset, timestamp, partitionLeaderEpoch, producerId, producerEpoch, marker)
}
/**