You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/06 18:51:14 UTC
[5/6] kafka git commit: KAFKA-5121;
Implement transaction index for KIP-98
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index e38e583..0a0f3d9 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -44,6 +44,7 @@ import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.ControlRecordType;
import org.apache.kafka.common.header.internals.RecordHeader;
+import org.apache.kafka.common.record.EndTransactionMarker;
import org.apache.kafka.common.record.LegacyRecord;
import org.apache.kafka.common.record.MemoryRecords;
import org.apache.kafka.common.record.MemoryRecordsBuilder;
@@ -181,16 +182,18 @@ public class FetcherTest {
assertEquals(1, fetcher.sendFetches());
assertFalse(fetcher.hasCompletedFetches());
+ long producerId = 1;
+ short epoch = 0;
+ int baseSequence = 0;
+
ByteBuffer buffer = ByteBuffer.allocate(1024);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
- builder.append(0L, "key".getBytes(), null);
- builder.appendControlRecord(0L, ControlRecordType.COMMIT, null);
+ MemoryRecordsBuilder builder = MemoryRecords.idempotentBuilder(buffer, CompressionType.NONE, 0L, producerId,
+ epoch, baseSequence);
builder.append(0L, "key".getBytes(), null);
builder.close();
- builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 3L);
- builder.appendControlRecord(0L, ControlRecordType.ABORT, null);
- builder.close();
+ MemoryRecords.writeEndTransactionalMarker(buffer, 1L, producerId, epoch, new EndTransactionMarker(ControlRecordType.ABORT, 0)
+ );
buffer.flip();
@@ -202,10 +205,11 @@ public class FetcherTest {
assertTrue(partitionRecords.containsKey(tp1));
List<ConsumerRecord<byte[], byte[]>> records = partitionRecords.get(tp1);
- assertEquals(2, records.size());
- assertEquals(4L, subscriptions.position(tp1).longValue());
- for (ConsumerRecord<byte[], byte[]> record : records)
- assertArrayEquals("key".getBytes(), record.key());
+ assertEquals(1, records.size());
+ assertEquals(2L, subscriptions.position(tp1).longValue());
+
+ ConsumerRecord<byte[], byte[]> record = records.get(0);
+ assertArrayEquals("key".getBytes(), record.key());
}
@Test
@@ -814,6 +818,29 @@ public class FetcherTest {
}
@Test
+ public void testListOffsetsSendsIsolationLevel() {
+ for (final IsolationLevel isolationLevel : IsolationLevel.values()) {
+ Fetcher<byte[], byte[]> fetcher = createFetcher(subscriptions, new Metrics(), new ByteArrayDeserializer(),
+ new ByteArrayDeserializer(), Integer.MAX_VALUE, isolationLevel);
+
+ subscriptions.assignFromUser(singleton(tp1));
+ subscriptions.needOffsetReset(tp1, OffsetResetStrategy.LATEST);
+
+ client.prepareResponse(new MockClient.RequestMatcher() {
+ @Override
+ public boolean matches(AbstractRequest body) {
+ ListOffsetRequest request = (ListOffsetRequest) body;
+ return request.isolationLevel() == isolationLevel;
+ }
+ }, listOffsetResponse(Errors.NONE, 1L, 5L));
+ fetcher.updateFetchPositions(singleton(tp1));
+ assertFalse(subscriptions.isOffsetResetNeeded(tp1));
+ assertTrue(subscriptions.isFetchable(tp1));
+ assertEquals(5, subscriptions.position(tp1).longValue());
+ }
+ }
+
+ @Test
public void testUpdateFetchPositionResetToEarliestOffset() {
subscriptions.assignFromUser(singleton(tp1));
subscriptions.needOffsetReset(tp1, OffsetResetStrategy.EARLIEST);
@@ -1206,7 +1233,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += abortTransaction(buffer, 1L, currentOffset);
buffer.flip();
@@ -1240,7 +1267,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += commitTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1278,7 +1305,7 @@ public class FetcherTest {
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
int currOffset = 0;
- // Appends for producer 1 (evetually committed)
+ // Appends for producer 1 (eventually committed)
currOffset += appendTransactionalRecords(buffer, 1L, currOffset,
new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
@@ -1288,13 +1315,13 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "abort2-1".getBytes(), "value".getBytes()));
// commit producer 1
- currOffset += commitTransaction(buffer, 1L, currOffset, time.milliseconds());
+ currOffset += commitTransaction(buffer, 1L, currOffset);
// append more for producer 2 (eventually aborted)
currOffset += appendTransactionalRecords(buffer, 2L, currOffset,
new SimpleRecord(time.milliseconds(), "abort2-2".getBytes(), "value".getBytes()));
// abort producer 2
- currOffset += abortTransaction(buffer, 2L, currOffset, time.milliseconds());
+ currOffset += abortTransaction(buffer, 2L, currOffset);
abortedTransactions.add(new FetchResponse.AbortedTransaction(2, 2));
// New transaction for producer 1 (eventually aborted)
@@ -1310,11 +1337,11 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
// abort producer 1
- currOffset += abortTransaction(buffer, 1L, currOffset, time.milliseconds());
+ currOffset += abortTransaction(buffer, 1L, currOffset);
abortedTransactions.add(new FetchResponse.AbortedTransaction(1, 6));
// commit producer 2
- currOffset += commitTransaction(buffer, 2L, currOffset, time.milliseconds());
+ currOffset += commitTransaction(buffer, 2L, currOffset);
buffer.flip();
@@ -1335,12 +1362,11 @@ public class FetcherTest {
assertTrue(fetchedRecords.containsKey(tp1));
// There are only 3 committed records
List<ConsumerRecord<byte[], byte[]>> fetchedConsumerRecords = fetchedRecords.get(tp1);
- Set<String> committedKeys = new HashSet<>(Arrays.asList("commit1-1", "commit1-2", "commit2-1"));
- Set<String> actuallyCommittedKeys = new HashSet<>();
+ Set<String> fetchedKeys = new HashSet<>();
for (ConsumerRecord<byte[], byte[]> consumerRecord : fetchedConsumerRecords) {
- actuallyCommittedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
+ fetchedKeys.add(new String(consumerRecord.key(), StandardCharsets.UTF_8));
}
- assertTrue(actuallyCommittedKeys.equals(committedKeys));
+ assertEquals(Utils.mkSet("commit1-1", "commit1-2", "commit2-1"), fetchedKeys);
}
@Test
@@ -1354,14 +1380,14 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += abortTransaction(buffer, 1L, currentOffset);
// Duplicate abort -- should be ignored.
- currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += abortTransaction(buffer, 1L, currentOffset);
// Now commit a transaction.
currentOffset += appendTransactionalRecords(buffer, 1L, currentOffset,
new SimpleRecord(time.milliseconds(), "commit1-1".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "commit1-2".getBytes(), "value".getBytes()));
- currentOffset += commitTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += commitTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1402,7 +1428,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "key".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += abortTransaction(buffer, 1L, currentOffset);
buffer.flip();
@@ -1436,7 +1462,7 @@ public class FetcherTest {
new SimpleRecord(time.milliseconds(), "abort1-1".getBytes(), "value".getBytes()),
new SimpleRecord(time.milliseconds(), "abort1-2".getBytes(), "value".getBytes()));
- currentOffset += abortTransaction(buffer, 1L, currentOffset, time.milliseconds());
+ currentOffset += abortTransaction(buffer, 1L, currentOffset);
buffer.flip();
List<FetchResponse.AbortedTransaction> abortedTransactions = new ArrayList<>();
@@ -1463,7 +1489,8 @@ public class FetcherTest {
private int appendTransactionalRecords(ByteBuffer buffer, long pid, long baseOffset, SimpleRecord... records) {
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH);
for (SimpleRecord record : records) {
builder.append(record);
@@ -1472,19 +1499,15 @@ public class FetcherTest {
return records.length;
}
- private int commitTransaction(ByteBuffer buffer, long pid, int baseOffset, long timestamp) {
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
- builder.appendControlRecord(timestamp, ControlRecordType.COMMIT, null);
- builder.build();
+ private int commitTransaction(ByteBuffer buffer, long producerId, int baseOffset) {
+ MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+ new EndTransactionMarker(ControlRecordType.COMMIT, 0));
return 1;
}
- private int abortTransaction(ByteBuffer buffer, long pid, long baseOffset, long timestamp) {
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.LOG_APPEND_TIME, baseOffset, time.milliseconds(), pid, (short) 0, (int) baseOffset, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
- builder.appendControlRecord(timestamp, ControlRecordType.ABORT, null);
- builder.build();
+ private int abortTransaction(ByteBuffer buffer, long producerId, long baseOffset) {
+ MemoryRecords.writeEndTransactionalMarker(buffer, baseOffset, producerId, (short) 0,
+ new EndTransactionMarker(ControlRecordType.ABORT, 0));
return 1;
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
index 57f4663..ec858aa 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordBatchTest.java
@@ -224,25 +224,32 @@ public class DefaultRecordBatchTest {
}
@Test
- public void testReadAndWriteControlRecord() {
+ public void testReadAndWriteControlBatch() {
+ long producerId = 1L;
+ short producerEpoch = 0;
+ int coordinatorEpoch = 15;
+
ByteBuffer buffer = ByteBuffer.allocate(128);
- MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, CompressionType.NONE,
- TimestampType.CREATE_TIME, 0L);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE,
+ CompressionType.NONE, TimestampType.CREATE_TIME, 0L, RecordBatch.NO_TIMESTAMP, producerId,
+ producerEpoch, RecordBatch.NO_SEQUENCE, true, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
+ buffer.remaining());
- builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.COMMIT, null);
- builder.appendControlRecord(System.currentTimeMillis(), ControlRecordType.ABORT, null);
+ EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+ builder.appendEndTxnMarker(System.currentTimeMillis(), marker);
MemoryRecords records = builder.build();
+ List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+ assertEquals(1, batches.size());
+
+ MutableRecordBatch batch = batches.get(0);
+ assertTrue(batch.isControlBatch());
+
List<Record> logRecords = TestUtils.toList(records.records());
- assertEquals(2, logRecords.size());
+ assertEquals(1, logRecords.size());
Record commitRecord = logRecords.get(0);
- assertTrue(commitRecord.isControlRecord());
- assertEquals(ControlRecordType.COMMIT, ControlRecordType.parse(commitRecord.key()));
-
- Record abortRecord = logRecords.get(1);
- assertTrue(abortRecord.isControlRecord());
- assertEquals(ControlRecordType.ABORT, ControlRecordType.parse(abortRecord.key()));
+ assertEquals(marker, EndTransactionMarker.deserialize(commitRecord));
}
@Test
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
index 251db15..61b7b00 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
@@ -21,7 +21,6 @@ import org.apache.kafka.common.header.internals.RecordHeader;
import org.junit.Test;
import java.nio.ByteBuffer;
-import java.util.Arrays;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
@@ -45,31 +44,27 @@ public class DefaultRecordTest {
new SimpleRecord(15L, "hi".getBytes(), "there".getBytes(), headers)
};
- for (boolean isControlRecord : Arrays.asList(true, false)) {
- for (SimpleRecord record : records) {
- int baseSequence = 723;
- long baseOffset = 37;
- int offsetDelta = 10;
- long baseTimestamp = System.currentTimeMillis();
- long timestampDelta = 323;
+ for (SimpleRecord record : records) {
+ int baseSequence = 723;
+ long baseOffset = 37;
+ int offsetDelta = 10;
+ long baseTimestamp = System.currentTimeMillis();
+ long timestampDelta = 323;
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- DefaultRecord.writeTo(buffer, isControlRecord, offsetDelta, timestampDelta, record.key(),
- record.value(), record.headers());
- buffer.flip();
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+ DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, record.key(), record.value(), record.headers());
+ buffer.flip();
- DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
- assertNotNull(logRecord);
- assertEquals(baseOffset + offsetDelta, logRecord.offset());
- assertEquals(baseSequence + offsetDelta, logRecord.sequence());
- assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
- assertEquals(record.key(), logRecord.key());
- assertEquals(record.value(), logRecord.value());
- assertEquals(isControlRecord, logRecord.isControlRecord());
- assertArrayEquals(record.headers(), logRecord.headers());
- assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
- record.headers()), logRecord.sizeInBytes());
- }
+ DefaultRecord logRecord = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, baseSequence, null);
+ assertNotNull(logRecord);
+ assertEquals(baseOffset + offsetDelta, logRecord.offset());
+ assertEquals(baseSequence + offsetDelta, logRecord.sequence());
+ assertEquals(baseTimestamp + timestampDelta, logRecord.timestamp());
+ assertEquals(record.key(), logRecord.key());
+ assertEquals(record.value(), logRecord.value());
+ assertArrayEquals(record.headers(), logRecord.headers());
+ assertEquals(DefaultRecord.sizeInBytes(offsetDelta, timestampDelta, record.key(), record.value(),
+ record.headers()), logRecord.sizeInBytes());
}
}
@@ -83,7 +78,7 @@ public class DefaultRecordTest {
long timestampDelta = 323;
ByteBuffer buffer = ByteBuffer.allocate(1024);
- DefaultRecord.writeTo(buffer, false, offsetDelta, timestampDelta, key, value, new Header[0]);
+ DefaultRecord.writeTo(buffer, offsetDelta, timestampDelta, key, value, new Header[0]);
buffer.flip();
DefaultRecord record = DefaultRecord.readFrom(buffer, baseOffset, baseTimestamp, RecordBatch.NO_SEQUENCE, null);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
new file mode 100644
index 0000000..903f674
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/EndTransactionMarkerTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class EndTransactionMarkerTest {
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testUnknownControlTypeNotAllowed() {
+ new EndTransactionMarker(ControlRecordType.UNKNOWN, 24);
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testCannotDeserializeUnknownControlType() {
+ EndTransactionMarker.deserializeValue(ControlRecordType.UNKNOWN, ByteBuffer.wrap(new byte[0]));
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testIllegalNegativeVersion() {
+ ByteBuffer buffer = ByteBuffer.allocate(2);
+ buffer.putShort((short) -1);
+ buffer.flip();
+ EndTransactionMarker.deserializeValue(ControlRecordType.ABORT, buffer);
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testNotEnoughBytes() {
+ EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, ByteBuffer.wrap(new byte[0]));
+ }
+
+ @Test
+ public void testSerde() {
+ int coordinatorEpoch = 79;
+ EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+ ByteBuffer buffer = marker.serializeValue();
+ EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
+ assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
+ }
+
+ @Test
+ public void testDeserializeNewerVersion() {
+ int coordinatorEpoch = 79;
+ ByteBuffer buffer = ByteBuffer.allocate(8);
+ buffer.putShort((short) 5);
+ buffer.putInt(coordinatorEpoch);
+ buffer.putShort((short) 0); // unexpected data
+ buffer.flip();
+ EndTransactionMarker deserialized = EndTransactionMarker.deserializeValue(ControlRecordType.COMMIT, buffer);
+ assertEquals(coordinatorEpoch, deserialized.coordinatorEpoch());
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 294f2f8..11ee419 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -140,25 +140,25 @@ public class FileRecordsTest {
int message1Size = batches.get(0).sizeInBytes();
assertEquals("Should be able to find the first message by its offset",
- new FileRecords.LogEntryPosition(0L, position, message1Size),
+ new FileRecords.LogOffsetPosition(0L, position, message1Size),
fileRecords.searchForOffsetWithSize(0, 0));
position += message1Size;
int message2Size = batches.get(1).sizeInBytes();
assertEquals("Should be able to find second message when starting from 0",
- new FileRecords.LogEntryPosition(1L, position, message2Size),
+ new FileRecords.LogOffsetPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, 0));
assertEquals("Should be able to find second message starting from its offset",
- new FileRecords.LogEntryPosition(1L, position, message2Size),
+ new FileRecords.LogOffsetPosition(1L, position, message2Size),
fileRecords.searchForOffsetWithSize(1, position));
position += message2Size + batches.get(2).sizeInBytes();
int message4Size = batches.get(3).sizeInBytes();
assertEquals("Should be able to find fourth message from a non-existant offset",
- new FileRecords.LogEntryPosition(50L, position, message4Size),
+ new FileRecords.LogOffsetPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(3, position));
assertEquals("Should be able to find fourth message by correct offset",
- new FileRecords.LogEntryPosition(50L, position, message4Size),
+ new FileRecords.LogOffsetPosition(50L, position, message4Size),
fileRecords.searchForOffsetWithSize(50, position));
}
@@ -241,7 +241,6 @@ public class FileRecordsTest {
EasyMock.expect(channelMock.size()).andReturn(42L).atLeastOnce();
EasyMock.expect(channelMock.position(42L)).andReturn(null).once();
EasyMock.expect(channelMock.truncate(23L)).andReturn(null).once();
- EasyMock.expect(channelMock.position(23L)).andReturn(null).once();
EasyMock.replay(channelMock);
FileRecords fileRecords = new FileRecords(tempFile(), channelMock, 0, Integer.MAX_VALUE, false);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 330879f..0467522 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -50,7 +50,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
MemoryRecords records = builder.build();
assertEquals(0, records.sizeInBytes());
assertEquals(bufferOffset, buffer.position());
@@ -66,8 +66,8 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH,
- buffer.capacity());
+ TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, false,
+ RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
MemoryRecords records = builder.build();
@@ -86,7 +86,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -99,7 +99,33 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteControlBatchNotAllowedMagicV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteControlBatchNotAllowedMagicV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
}
@Test(expected = IllegalArgumentException.class)
@@ -112,7 +138,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
@@ -126,7 +152,7 @@ public class MemoryRecordsBuilderTest {
int sequence = 2342;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
@@ -140,10 +166,38 @@ public class MemoryRecordsBuilderTest {
int sequence = RecordBatch.NO_SEQUENCE;
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
- 0L, 0L, pid, epoch, sequence, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.close();
}
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteEndTxnMarkerNonTransactionalBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = RecordBatch.NO_SEQUENCE;
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, false, true, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteEndTxnMarkerNonControlBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = RecordBatch.NO_SEQUENCE;
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ builder.appendEndTxnMarker(RecordBatch.NO_TIMESTAMP, new EndTransactionMarker(ControlRecordType.ABORT, 0));
+ }
+
@Test
public void testCompressionRateV0() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
@@ -157,7 +211,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
@@ -188,7 +242,7 @@ public class MemoryRecordsBuilderTest {
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
for (LegacyRecord record : records) {
@@ -214,7 +268,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
- RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(0L, "b".getBytes(), "2".getBytes());
builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -243,7 +297,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(2L, "b".getBytes(), "2".getBytes());
builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -276,7 +330,7 @@ public class MemoryRecordsBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(512);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
- RecordBatch.NO_SEQUENCE, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
+ RecordBatch.NO_SEQUENCE, false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, writeLimit);
assertFalse(builder.isFull());
assertTrue(builder.hasRoomFor(0L, key, value));
@@ -302,7 +356,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());
@@ -330,7 +384,7 @@ public class MemoryRecordsBuilderTest {
long logAppendTime = System.currentTimeMillis();
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
- false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
+ false, false, RecordBatch.NO_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
@@ -346,13 +400,18 @@ public class MemoryRecordsBuilderTest {
builder.append(10L, "1".getBytes(), "a".getBytes());
builder.close();
+ MemoryRecords.writeEndTransactionalMarker(buffer, 1L, 15L, (short) 0,
+ new EndTransactionMarker(ControlRecordType.ABORT, 0));
+
builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
TimestampType.CREATE_TIME, 1L);
- builder.append(11L, "2".getBytes(), "b".getBytes());
- builder.appendControlRecord(12L, ControlRecordType.COMMIT, null);
+ builder.append(12L, "2".getBytes(), "b".getBytes());
builder.append(13L, "3".getBytes(), "c".getBytes());
builder.close();
+ MemoryRecords.writeEndTransactionalMarker(buffer, 14L, 1L, (short) 0,
+ new EndTransactionMarker(ControlRecordType.COMMIT, 0));
+
buffer.flip();
Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 49e1429..014a5bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -65,7 +65,7 @@ public class MemoryRecordsTest {
ByteBuffer buffer = ByteBuffer.allocate(1024);
MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression,
- TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false,
+ TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false, false,
partitionLeaderEpoch, buffer.limit());
SimpleRecord[] records = new SimpleRecord[] {
@@ -216,9 +216,44 @@ public class MemoryRecordsTest {
}
@Test
+ public void testBuildEndTxnMarker() {
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ long producerId = 73;
+ short producerEpoch = 13;
+ long initialOffset = 983L;
+ int coordinatorEpoch = 347;
+ EndTransactionMarker marker = new EndTransactionMarker(ControlRecordType.COMMIT, coordinatorEpoch);
+ MemoryRecords records = MemoryRecords.withEndTransactionMarker(initialOffset, producerId, producerEpoch, marker);
+ // verify that buffer allocation was precise
+ assertEquals(records.buffer().remaining(), records.buffer().capacity());
+
+ List<MutableRecordBatch> batches = TestUtils.toList(records.batches());
+ assertEquals(1, batches.size());
+
+ RecordBatch batch = batches.get(0);
+ assertTrue(batch.isControlBatch());
+ assertEquals(producerId, batch.producerId());
+ assertEquals(producerEpoch, batch.producerEpoch());
+ assertEquals(initialOffset, batch.baseOffset());
+ assertTrue(batch.isValid());
+
+ List<Record> createdRecords = TestUtils.toList(batch);
+ assertEquals(1, createdRecords.size());
+
+ Record record = createdRecords.get(0);
+ assertTrue(record.isValid());
+ EndTransactionMarker deserializedMarker = EndTransactionMarker.deserialize(record);
+ assertEquals(ControlRecordType.COMMIT, deserializedMarker.controlType());
+ assertEquals(coordinatorEpoch, deserializedMarker.coordinatorEpoch());
+ }
+ }
+
+ @Test
public void testFilterToPreservesProducerInfo() {
if (magic >= RecordBatch.MAGIC_VALUE_V2) {
ByteBuffer buffer = ByteBuffer.allocate(2048);
+
+ // non-idempotent, non-transactional
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 0L);
builder.append(10L, null, "a".getBytes());
builder.append(11L, "1".getBytes(), "b".getBytes());
@@ -226,17 +261,28 @@ public class MemoryRecordsTest {
builder.close();
- long pid = 23L;
- short epoch = 5;
- int baseSequence = 10;
-
+ // idempotent
+ long pid1 = 23L;
+ short epoch1 = 5;
+ int baseSequence1 = 10;
builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
- RecordBatch.NO_TIMESTAMP, pid, epoch, baseSequence);
+ RecordBatch.NO_TIMESTAMP, pid1, epoch1, baseSequence1);
builder.append(13L, null, "d".getBytes());
builder.append(14L, "4".getBytes(), "e".getBytes());
builder.append(15L, "5".getBytes(), "f".getBytes());
builder.close();
+ // transactional
+ long pid2 = 99384L;
+ short epoch2 = 234;
+ int baseSequence2 = 15;
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.CREATE_TIME, 3L,
+ RecordBatch.NO_TIMESTAMP, pid2, epoch2, baseSequence2, true, RecordBatch.NO_PARTITION_LEADER_EPOCH);
+ builder.append(16L, "6".getBytes(), "g".getBytes());
+ builder.append(17L, null, "h".getBytes());
+ builder.append(18L, "8".getBytes(), "i".getBytes());
+ builder.close();
+
buffer.flip();
ByteBuffer filtered = ByteBuffer.allocate(2048);
@@ -246,7 +292,7 @@ public class MemoryRecordsTest {
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
- assertEquals(2, batches.size());
+ assertEquals(3, batches.size());
MutableRecordBatch firstBatch = batches.get(0);
assertEquals(1, firstBatch.countOrNull().intValue());
@@ -256,15 +302,27 @@ public class MemoryRecordsTest {
assertEquals(RecordBatch.NO_PRODUCER_EPOCH, firstBatch.producerEpoch());
assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.baseSequence());
assertEquals(RecordBatch.NO_SEQUENCE, firstBatch.lastSequence());
+ assertFalse(firstBatch.isTransactional());
MutableRecordBatch secondBatch = batches.get(1);
assertEquals(2, secondBatch.countOrNull().intValue());
assertEquals(3L, secondBatch.baseOffset());
assertEquals(5L, secondBatch.lastOffset());
- assertEquals(pid, secondBatch.producerId());
- assertEquals(epoch, secondBatch.producerEpoch());
- assertEquals(baseSequence, secondBatch.baseSequence());
- assertEquals(baseSequence + 2, secondBatch.lastSequence());
+ assertEquals(pid1, secondBatch.producerId());
+ assertEquals(epoch1, secondBatch.producerEpoch());
+ assertEquals(baseSequence1, secondBatch.baseSequence());
+ assertEquals(baseSequence1 + 2, secondBatch.lastSequence());
+ assertFalse(secondBatch.isTransactional());
+
+ MutableRecordBatch thirdBatch = batches.get(2);
+ assertEquals(2, thirdBatch.countOrNull().intValue());
+ assertEquals(3L, thirdBatch.baseOffset());
+ assertEquals(5L, thirdBatch.lastOffset());
+ assertEquals(pid2, thirdBatch.producerId());
+ assertEquals(epoch2, thirdBatch.producerEpoch());
+ assertEquals(baseSequence2, thirdBatch.baseSequence());
+ assertEquals(baseSequence2 + 2, thirdBatch.lastSequence());
+ assertTrue(thirdBatch.isTransactional());
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index c948fd1..6443e4d 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -94,6 +94,9 @@ public class RequestResponseTest {
checkRequest(createListOffsetRequest(1));
checkErrorResponse(createListOffsetRequest(1), new UnknownServerException());
checkResponse(createListOffsetResponse(1), 1);
+ checkRequest(createListOffsetRequest(2));
+ checkErrorResponse(createListOffsetRequest(2), new UnknownServerException());
+ checkResponse(createListOffsetResponse(2), 2);
checkRequest(MetadataRequest.Builder.allTopics().build((short) 2));
checkRequest(createMetadataRequest(1, asList("topic1")));
checkErrorResponse(createMetadataRequest(1, asList("topic1")), new UnknownServerException());
@@ -621,11 +624,24 @@ public class RequestResponseTest {
Map<TopicPartition, ListOffsetRequest.PartitionData> offsetData = Collections.singletonMap(
new TopicPartition("test", 0),
new ListOffsetRequest.PartitionData(1000000L, 10));
- return ListOffsetRequest.Builder.forConsumer(false).setOffsetData(offsetData).build((short) version);
+ return ListOffsetRequest.Builder
+ .forConsumer(false, IsolationLevel.READ_UNCOMMITTED)
+ .setOffsetData(offsetData)
+ .build((short) version);
} else if (version == 1) {
Map<TopicPartition, Long> offsetData = Collections.singletonMap(
new TopicPartition("test", 0), 1000000L);
- return ListOffsetRequest.Builder.forConsumer(true).setTargetTimes(offsetData).build((short) version);
+ return ListOffsetRequest.Builder
+ .forConsumer(true, IsolationLevel.READ_UNCOMMITTED)
+ .setTargetTimes(offsetData)
+ .build((short) version);
+ } else if (version == 2) {
+ Map<TopicPartition, Long> offsetData = Collections.singletonMap(
+ new TopicPartition("test", 0), 1000000L);
+ return ListOffsetRequest.Builder
+ .forConsumer(true, IsolationLevel.READ_COMMITTED)
+ .setTargetTimes(offsetData)
+ .build((short) version);
} else {
throw new IllegalArgumentException("Illegal ListOffsetRequest version " + version);
}
@@ -638,7 +654,7 @@ public class RequestResponseTest {
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, asList(100L)));
return new ListOffsetResponse(responseData);
- } else if (version == 1) {
+ } else if (version == 1 || version == 2) {
Map<TopicPartition, ListOffsetResponse.PartitionData> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0),
new ListOffsetResponse.PartitionData(Errors.NONE, 10000L, 100L));
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Partition.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Partition.scala b/core/src/main/scala/kafka/cluster/Partition.scala
index 1eea8dc..1d13689 100755
--- a/core/src/main/scala/kafka/cluster/Partition.scala
+++ b/core/src/main/scala/kafka/cluster/Partition.scala
@@ -456,7 +456,7 @@ class Partition(val topic: String,
laggingReplicas
}
- def appendRecordsToLeader(records: MemoryRecords, requiredAcks: Int = 0) = {
+ def appendRecordsToLeader(records: MemoryRecords, isFromClient: Boolean, requiredAcks: Int = 0) = {
val (info, leaderHWIncremented) = inReadLock(leaderIsrUpdateLock) {
leaderReplicaIfLocal match {
case Some(leaderReplica) =>
@@ -470,7 +470,7 @@ class Partition(val topic: String,
.format(topicPartition, inSyncSize, minIsr))
}
- val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch)
+ val info = log.appendAsLeader(records, leaderEpoch = this.leaderEpoch, isFromClient)
// probably unblock some follower fetch requests since log end offset has been updated
replicaManager.tryCompleteDelayedFetch(TopicPartitionOperationKey(this.topic, this.partitionId))
// we may need to increment high watermark since ISR could be down to 1
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/cluster/Replica.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/cluster/Replica.scala b/core/src/main/scala/kafka/cluster/Replica.scala
index a604b87..e3b1f2d 100644
--- a/core/src/main/scala/kafka/cluster/Replica.scala
+++ b/core/src/main/scala/kafka/cluster/Replica.scala
@@ -135,6 +135,7 @@ class Replica(val brokerId: Int,
def highWatermark_=(newHighWatermark: LogOffsetMetadata) {
if (isLocal) {
highWatermarkMetadata = newHighWatermark
+ log.foreach(_.onHighWatermarkIncremented(newHighWatermark.messageOffset))
trace(s"Setting high watermark for replica $brokerId partition $topicPartition to [$newHighWatermark]")
} else {
throw new KafkaException(s"Should not set high watermark on partition $topicPartition's non-local replica $brokerId")
@@ -143,6 +144,23 @@ class Replica(val brokerId: Int,
def highWatermark = highWatermarkMetadata
+ /**
+ * The last stable offset (LSO) is defined as the first offset such that all lower offsets have been "decided."
+ * Non-transactional messages are considered decided immediately, but transactional messages are only decided when
+ * the corresponding COMMIT or ABORT marker is written. This implies that the last stable offset will be equal
+ * to the high watermark if there are no transactional messages in the log. Note also that the LSO cannot advance
+ * beyond the high watermark.
+ */
+ def lastStableOffset: LogOffsetMetadata = {
+ log.map { log =>
+ log.firstUnstableOffset match {
+ case Some(offsetMetadata) if offsetMetadata.messageOffset < highWatermark.messageOffset => offsetMetadata
+ case _ => highWatermark
+ }
+ }.getOrElse(throw new KafkaException(s"Cannot fetch last stable offset on partition $topicPartition's " +
+ s"non-local replica $brokerId"))
+ }
+
def convertHWToLocalOffsetMetadata() = {
if (isLocal) {
highWatermarkMetadata = log.get.convertToOffsetMetadata(highWatermarkMetadata.messageOffset)
@@ -165,7 +183,10 @@ class Replica(val brokerId: Int,
replicaString.append("; Partition: " + partition.partitionId)
replicaString.append("; isLocal: " + isLocal)
replicaString.append("; lastCaughtUpTimeMs: " + lastCaughtUpTimeMs)
- if (isLocal) replicaString.append("; Highwatermark: " + highWatermark)
+ if (isLocal) {
+ replicaString.append("; Highwatermark: " + highWatermark)
+ replicaString.append("; LastStableOffset: " + lastStableOffset)
+ }
replicaString.toString
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
index e711392..3eafdb7 100644
--- a/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
+++ b/core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
@@ -38,7 +38,7 @@ import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.protocol.types.Type._
import org.apache.kafka.common.protocol.types.{ArrayOf, Field, Schema, Struct}
import org.apache.kafka.common.record._
-import org.apache.kafka.common.requests.OffsetFetchResponse
+import org.apache.kafka.common.requests.{IsolationLevel, OffsetFetchResponse}
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
@@ -225,7 +225,8 @@ class GroupMetadataManager(brokerId: Int,
replicaManager.appendRecords(
config.offsetCommitTimeoutMs.toLong,
config.offsetCommitRequiredAcks,
- true, // allow appending to internal offset topic
+ internalTopicsAllowed = true,
+ isFromClient = false,
delayedStore.partitionRecords,
delayedStore.callback)
}
@@ -429,7 +430,8 @@ class GroupMetadataManager(brokerId: Int,
case Some(log) =>
var currOffset = log.logStartOffset
- val buffer = ByteBuffer.allocate(config.loadBufferSize)
+ lazy val buffer = ByteBuffer.allocate(config.loadBufferSize)
+
// loop breaks if leader changes at any time during the load, since getHighWatermark is -1
val loadedOffsets = mutable.Map[GroupTopicPartition, OffsetAndMetadata]()
val removedOffsets = mutable.Set[GroupTopicPartition]()
@@ -437,12 +439,18 @@ class GroupMetadataManager(brokerId: Int,
val removedGroups = mutable.Set[String]()
while (currOffset < highWaterMark && !shuttingDown.get()) {
- buffer.clear()
- val fileRecords = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true)
- .records.asInstanceOf[FileRecords]
- val bufferRead = fileRecords.readInto(buffer, 0)
+ val fetchDataInfo = log.read(currOffset, config.loadBufferSize, maxOffset = None, minOneMessage = true,
+ isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+
+ val memRecords = fetchDataInfo.records match {
+ case records: MemoryRecords => records
+ case fileRecords: FileRecords =>
+ buffer.clear()
+ val bufferRead = fileRecords.readInto(buffer, 0)
+ MemoryRecords.readableRecords(bufferRead)
+ }
- MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+ memRecords.batches.asScala.foreach { batch =>
for (record <- batch.asScala) {
require(record.hasKey, "Group metadata/offset entry key should not be null")
GroupMetadataManager.readMessageKey(record.key) match {
@@ -630,7 +638,8 @@ class GroupMetadataManager(brokerId: Int,
// do not need to require acks since even if the tombstone is lost,
// it will be appended again in the next purge cycle
val records = MemoryRecords.withRecords(magicValue, 0L, compressionType, timestampType, tombstones: _*)
- partition.appendRecordsToLeader(records)
+ partition.appendRecordsToLeader(records, isFromClient = false, requiredAcks = 0)
+
offsetsRemoved += removedOffsets.size
trace(s"Successfully appended ${tombstones.size} tombstones to $appendPartition for expired/deleted " +
s"offsets and/or metadata for group $groupId")
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
index f07ca91..7930cd0 100644
--- a/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
+++ b/core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
@@ -31,6 +31,7 @@ import kafka.utils.{Logging, Pool, Scheduler, ZkUtils}
import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.protocol.Errors
import org.apache.kafka.common.record.{FileRecords, MemoryRecords, SimpleRecord}
+import org.apache.kafka.common.requests.IsolationLevel
import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
import org.apache.kafka.common.utils.{Time, Utils}
@@ -158,8 +159,7 @@ class TransactionStateManager(brokerId: Int,
warn(s"Attempted to load offsets and group metadata from $topicPartition, but found no log")
case Some(log) =>
- val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize)
-
+ lazy val buffer = ByteBuffer.allocate(config.transactionLogLoadBufferSize)
val loadedTransactions = mutable.Map.empty[String, TransactionMetadata]
val removedTransactionalIds = mutable.Set.empty[String]
@@ -169,11 +169,17 @@ class TransactionStateManager(brokerId: Int,
&& loadingPartitions.contains(topicPartition.partition())
&& !shuttingDown.get()) {
buffer.clear()
- val fileRecords = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None, minOneMessage = true)
- .records.asInstanceOf[FileRecords]
- val bufferRead = fileRecords.readInto(buffer, 0)
+ val fetchDataInfo = log.read(currOffset, config.transactionLogLoadBufferSize, maxOffset = None,
+ minOneMessage = true, isolationLevel = IsolationLevel.READ_UNCOMMITTED)
+ val memRecords = fetchDataInfo.records match {
+ case records: MemoryRecords => records
+ case fileRecords: FileRecords =>
+ buffer.clear()
+ val bufferRead = fileRecords.readInto(buffer, 0)
+ MemoryRecords.readableRecords(bufferRead)
+ }
- MemoryRecords.readableRecords(bufferRead).batches.asScala.foreach { batch =>
+ memRecords.batches.asScala.foreach { batch =>
for (record <- batch.asScala) {
require(record.hasKey, "Transaction state log's key should not be null")
TransactionLog.readMessageKey(record.key) match {
@@ -414,6 +420,7 @@ class TransactionStateManager(brokerId: Int,
txnMetadata.txnTimeoutMs.toLong,
TransactionLog.EnforcedRequiredAcks,
internalTopicsAllowed = true,
+ isFromClient = false,
recordsPerPartition,
updateCacheCallback)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e71dce89/core/src/main/scala/kafka/log/AbstractIndex.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/AbstractIndex.scala b/core/src/main/scala/kafka/log/AbstractIndex.scala
index f7478ad..a125676 100644
--- a/core/src/main/scala/kafka/log/AbstractIndex.scala
+++ b/core/src/main/scala/kafka/log/AbstractIndex.scala
@@ -246,14 +246,26 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
* @param target The index key to look for
* @return The slot found or -1 if the least entry in the index is larger than the target key or the index is empty
*/
- protected def indexSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int = {
+ protected def largestLowerBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int =
+ indexSlotRangeFor(idx, target, searchEntity)._1
+
+ /**
+ * Find the smallest entry greater than or equal the target key or value. If none can be found, -1 is returned.
+ */
+ protected def smallestUpperBoundSlotFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): Int =
+ indexSlotRangeFor(idx, target, searchEntity)._2
+
+ /**
+ * Lookup lower and upper bounds for the given target.
+ */
+ private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
// check if the index is empty
if(_entries == 0)
- return -1
+ return (-1, -1)
// check if the target offset is smaller than the least offset
if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
- return -1
+ return (-1, 0)
// binary search for the entry
var lo = 0
@@ -267,9 +279,10 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
else if(compareResult < 0)
lo = mid
else
- return mid
+ return (mid, mid)
}
- lo
+
+ (lo, if (lo == _entries - 1) -1 else lo + 1)
}
private def compareIndexEntry(indexEntry: IndexEntry, target: Long, searchEntity: IndexSearchEntity): Int = {