You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/03/24 19:43:58 UTC
[05/11] kafka git commit: KAFKA-4816;
Message format changes for idempotent/transactional producer (KIP-98)
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index e1dcae4..ef48783 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -47,29 +48,116 @@ public class MemoryRecordsBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(128);
buffer.position(bufferOffset);
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
MemoryRecords records = builder.build();
assertEquals(0, records.sizeInBytes());
assertEquals(bufferOffset, buffer.position());
}
@Test
+ public void testWriteTransactionalRecordSet() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = 2342;
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH,
+ buffer.capacity());
+ builder.append(System.currentTimeMillis(), "foo".getBytes(), "bar".getBytes());
+ MemoryRecords records = builder.build();
+
+ List<MutableRecordBatch> batches = Utils.toList(records.batches().iterator());
+ assertEquals(1, batches.size());
+ assertTrue(batches.get(0).isTransactional());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteTransactionalNotAllowedMagicV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteTransactionalNotAllowedMagicV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteTransactionalWithInvalidPID() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = RecordBatch.NO_PRODUCER_ID;
+ short epoch = 15;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteIdempotentWithInvalidEpoch() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = RecordBatch.NO_PRODUCER_EPOCH;
+ int sequence = 2342;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test(expected = IllegalArgumentException.class)
+ public void testWriteIdempotentWithInvalidBaseSequence() {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ buffer.position(bufferOffset);
+
+ long pid = 9809;
+ short epoch = 15;
+ int sequence = RecordBatch.NO_SEQUENCE;
+
+ new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType, TimestampType.CREATE_TIME,
+ 0L, 0L, pid, epoch, sequence, true, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
+ }
+
+ @Test
public void testCompressionRateV0() {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
- Record[] records = new Record[] {
- Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
- Record.create(Record.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
- Record.create(Record.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
+ LegacyRecord[] records = new LegacyRecord[] {
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()),
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 1L, "b".getBytes(), "2".getBytes()),
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V0, 2L, "c".getBytes(), "3".getBytes()),
};
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V0, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
- for (Record record : records) {
+ for (LegacyRecord record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
@@ -78,7 +166,7 @@ public class MemoryRecordsBuilderTest {
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRate(), 0.00001);
} else {
- int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V0;
+ int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V0;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
}
@@ -89,17 +177,18 @@ public class MemoryRecordsBuilderTest {
ByteBuffer buffer = ByteBuffer.allocate(1024);
buffer.position(bufferOffset);
- Record[] records = new Record[] {
- Record.create(Record.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
- Record.create(Record.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
- Record.create(Record.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
+ LegacyRecord[] records = new LegacyRecord[] {
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 0L, "a".getBytes(), "1".getBytes()),
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 1L, "b".getBytes(), "2".getBytes()),
+ LegacyRecord.create(RecordBatch.MAGIC_VALUE_V1, 2L, "c".getBytes(), "3".getBytes()),
};
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, 0L, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, 0L, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
int uncompressedSize = 0;
- for (Record record : records) {
+ for (LegacyRecord record : records) {
uncompressedSize += record.sizeInBytes() + Records.LOG_OVERHEAD;
builder.append(record);
}
@@ -108,7 +197,7 @@ public class MemoryRecordsBuilderTest {
if (compressionType == CompressionType.NONE) {
assertEquals(1.0, builder.compressionRate(), 0.00001);
} else {
- int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - Record.RECORD_OVERHEAD_V1;
+ int compressedSize = built.sizeInBytes() - Records.LOG_OVERHEAD - LegacyRecord.RECORD_OVERHEAD_V1;
double computedCompressionRate = (double) compressedSize / uncompressedSize;
assertEquals(computedCompressionRate, builder.compressionRate(), 0.00001);
}
@@ -120,8 +209,9 @@ public class MemoryRecordsBuilderTest {
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(0L, "b".getBytes(), "2".getBytes());
builder.append(0L, "c".getBytes(), "3".getBytes());
@@ -132,33 +222,10 @@ public class MemoryRecordsBuilderTest {
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
- for (Record record : records.records()) {
- assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
- assertEquals(logAppendTime, record.timestamp());
- }
- }
-
- @Test
- public void convertUsingLogAppendTime() {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.position(bufferOffset);
-
- long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, buffer.capacity());
-
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
- MemoryRecords records = builder.build();
-
- MemoryRecordsBuilder.RecordsInfo info = builder.info();
- assertEquals(logAppendTime, info.maxTimestamp);
- assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
-
- for (Record record : records.records()) {
- assertEquals(TimestampType.LOG_APPEND_TIME, record.timestampType());
- assertEquals(logAppendTime, record.timestamp());
+ for (RecordBatch batch : records.batches()) {
+ assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+ for (Record record : batch)
+ assertEquals(logAppendTime, record.timestamp());
}
}
@@ -168,8 +235,9 @@ public class MemoryRecordsBuilderTest {
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(2L, "b".getBytes(), "2".getBytes());
builder.append(1L, "c".getBytes(), "3".getBytes());
@@ -185,9 +253,10 @@ public class MemoryRecordsBuilderTest {
int i = 0;
long[] expectedTimestamps = new long[] {0L, 2L, 1L};
- for (Record record : records.records()) {
- assertEquals(TimestampType.CREATE_TIME, record.timestampType());
- assertEquals(expectedTimestamps[i++], record.timestamp());
+ for (RecordBatch batch : records.batches()) {
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ for (Record record : batch)
+ assertEquals(expectedTimestamps[i++], record.timestamp());
}
}
@@ -199,15 +268,16 @@ public class MemoryRecordsBuilderTest {
byte[] value = "bar".getBytes();
int writeLimit = 0;
ByteBuffer buffer = ByteBuffer.allocate(512);
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType,
- TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit);
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.CURRENT_MAGIC_VALUE, compressionType,
+ TimestampType.CREATE_TIME, 0L, LegacyRecord.NO_TIMESTAMP, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH,
+ RecordBatch.NO_SEQUENCE, false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, writeLimit);
assertFalse(builder.isFull());
- assertTrue(builder.hasRoomFor(key, value));
+ assertTrue(builder.hasRoomFor(0L, key, value));
builder.append(0L, key, value);
assertTrue(builder.isFull());
- assertFalse(builder.hasRoomFor(key, value));
+ assertFalse(builder.hasRoomFor(0L, key, value));
MemoryRecords memRecords = builder.build();
List<Record> records = TestUtils.toList(memRecords.records());
@@ -224,12 +294,13 @@ public class MemoryRecordsBuilderTest {
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.append(0L, "a".getBytes(), "1".getBytes());
builder.append(1L, "b".getBytes(), "2".getBytes());
- assertFalse(builder.hasRoomFor("c".getBytes(), "3".getBytes()));
+ assertFalse(builder.hasRoomFor(2L, "c".getBytes(), "3".getBytes()));
builder.append(2L, "c".getBytes(), "3".getBytes());
MemoryRecords records = builder.build();
@@ -238,9 +309,10 @@ public class MemoryRecordsBuilderTest {
assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
long i = 0L;
- for (Record record : records.records()) {
- assertEquals(TimestampType.CREATE_TIME, record.timestampType());
- assertEquals(i++, record.timestamp());
+ for (RecordBatch batch : records.batches()) {
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ for (Record record : batch)
+ assertEquals(i++, record.timestamp());
}
}
@@ -250,8 +322,9 @@ public class MemoryRecordsBuilderTest {
buffer.position(bufferOffset);
long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType,
+ TimestampType.CREATE_TIME, 0L, logAppendTime, RecordBatch.NO_PRODUCER_ID, RecordBatch.NO_PRODUCER_EPOCH, RecordBatch.NO_SEQUENCE,
+ false, RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, buffer.capacity());
builder.appendWithOffset(0L, System.currentTimeMillis(), "a".getBytes(), null);
@@ -259,40 +332,78 @@ public class MemoryRecordsBuilderTest {
builder.appendWithOffset(0L, System.currentTimeMillis(), "b".getBytes(), null);
}
- @Test(expected = IllegalArgumentException.class)
- public void testAppendWithInvalidMagic() {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.position(bufferOffset);
-
- long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
+ @Test
+ public void convertV2ToV1UsingMixedCreateAndLogAppendTime() {
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2,
+ compressionType, TimestampType.LOG_APPEND_TIME, 0L);
+ builder.append(10L, "1".getBytes(), "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+ TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "2".getBytes(), "b".getBytes());
+ builder.appendControlRecord(12L, ControlRecordType.COMMIT, null);
+ builder.append(13L, "3".getBytes(), "c".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+
+ List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+ if (compressionType != CompressionType.NONE) {
+ assertEquals(2, batches.size());
+ assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+ assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+ } else {
+ assertEquals(3, batches.size());
+ assertEquals(TimestampType.LOG_APPEND_TIME, batches.get(0).timestampType());
+ assertEquals(TimestampType.CREATE_TIME, batches.get(1).timestampType());
+ assertEquals(TimestampType.CREATE_TIME, batches.get(2).timestampType());
+ }
- builder.append(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), null));
+ List<Record> logRecords = Utils.toList(records.records().iterator());
+ assertEquals(3, logRecords.size());
+ assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
+ assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
+ assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
}
@Test
- public void convertUsingCreateTime() {
- ByteBuffer buffer = ByteBuffer.allocate(1024);
- buffer.position(bufferOffset);
-
- long logAppendTime = System.currentTimeMillis();
- MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.MAGIC_VALUE_V1, compressionType,
- TimestampType.CREATE_TIME, 0L, logAppendTime, buffer.capacity());
-
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "a".getBytes(), "1".getBytes()));
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "b".getBytes(), "2".getBytes()));
- builder.convertAndAppend(Record.create(Record.MAGIC_VALUE_V0, 0L, "c".getBytes(), "3".getBytes()));
- MemoryRecords records = builder.build();
-
- MemoryRecordsBuilder.RecordsInfo info = builder.info();
- assertEquals(Record.NO_TIMESTAMP, info.maxTimestamp);
- assertEquals(2L, info.shallowOffsetOfMaxTimestamp);
-
- for (Record record : records.records()) {
- assertEquals(TimestampType.CREATE_TIME, record.timestampType());
- assertEquals(Record.NO_TIMESTAMP, record.timestamp());
+ public void convertToV1WithMixedV0AndV2Data() {
+ ByteBuffer buffer = ByteBuffer.allocate(512);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0,
+ compressionType, TimestampType.NO_TIMESTAMP_TYPE, 0L);
+ builder.append(RecordBatch.NO_TIMESTAMP, "1".getBytes(), "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType,
+ TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "2".getBytes(), "b".getBytes());
+ builder.append(12L, "3".getBytes(), "c".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+
+ List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator());
+ if (compressionType != CompressionType.NONE) {
+ assertEquals(2, batches.size());
+ assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+ assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+ } else {
+ assertEquals(3, batches.size());
+ assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+ assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+ assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic());
}
+
+ List<Record> logRecords = Utils.toList(records.records().iterator());
+ assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
+ assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
+ assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
}
@Parameterized.Parameters
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index bfe0a57..8cead03 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.utils.Utils;
import org.apache.kafka.test.TestUtils;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -24,13 +25,12 @@ import org.junit.runners.Parameterized;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Iterator;
import java.util.List;
import static java.util.Arrays.asList;
-import static org.apache.kafka.common.utils.Utils.toNullableArray;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
@@ -39,57 +39,106 @@ public class MemoryRecordsTest {
private CompressionType compression;
private byte magic;
private long firstOffset;
+ private long pid;
+ private short epoch;
+ private int firstSequence;
+ private long logAppendTime = System.currentTimeMillis();
+ private int partitionLeaderEpoch = 998;
public MemoryRecordsTest(byte magic, long firstOffset, CompressionType compression) {
this.magic = magic;
this.compression = compression;
this.firstOffset = firstOffset;
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ pid = 134234L;
+ epoch = 28;
+ firstSequence = 777;
+ } else {
+ pid = RecordBatch.NO_PRODUCER_ID;
+ epoch = RecordBatch.NO_PRODUCER_EPOCH;
+ firstSequence = RecordBatch.NO_SEQUENCE;
+ }
}
@Test
public void testIterator() {
- MemoryRecordsBuilder builder1 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
- MemoryRecordsBuilder builder2 = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME, firstOffset);
- List<Record> list = asList(
- Record.create(magic, 1L, "a".getBytes(), "1".getBytes()),
- Record.create(magic, 2L, "b".getBytes(), "2".getBytes()),
- Record.create(magic, 3L, "c".getBytes(), "3".getBytes()),
- Record.create(magic, 4L, null, "4".getBytes()),
- Record.create(magic, 5L, "e".getBytes(), null),
- Record.create(magic, 6L, null, null));
-
- for (int i = 0; i < list.size(); i++) {
- Record r = list.get(i);
- builder1.append(r);
- builder2.append(i + 1, toNullableArray(r.key()), toNullableArray(r.value()));
- }
+ ByteBuffer buffer = ByteBuffer.allocate(1024);
+
+ MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, magic, compression,
+ TimestampType.CREATE_TIME, firstOffset, logAppendTime, pid, epoch, firstSequence, false,
+ partitionLeaderEpoch, buffer.limit());
- MemoryRecords recs1 = builder1.build();
- MemoryRecords recs2 = builder2.build();
+ SimpleRecord[] records = new SimpleRecord[] {
+ new SimpleRecord(1L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(2L, "b".getBytes(), "2".getBytes()),
+ new SimpleRecord(3L, "c".getBytes(), "3".getBytes()),
+ new SimpleRecord(4L, null, "4".getBytes()),
+ new SimpleRecord(5L, "d".getBytes(), null),
+ new SimpleRecord(6L, (byte[]) null, null)
+ };
+ for (SimpleRecord record : records)
+ builder.append(record);
+
+ MemoryRecords memoryRecords = builder.build();
for (int iteration = 0; iteration < 2; iteration++) {
- for (MemoryRecords recs : asList(recs1, recs2)) {
- Iterator<LogEntry> iter = recs.deepEntries().iterator();
- for (int i = 0; i < list.size(); i++) {
- assertTrue(iter.hasNext());
- LogEntry entry = iter.next();
- assertEquals(firstOffset + i, entry.offset());
- assertEquals(list.get(i), entry.record());
- entry.record().ensureValid();
+ int total = 0;
+ for (RecordBatch batch : memoryRecords.batches()) {
+ assertTrue(batch.isValid());
+ assertEquals(compression, batch.compressionType());
+ assertEquals(firstOffset + total, batch.baseOffset());
+
+ if (magic >= RecordBatch.MAGIC_VALUE_V2) {
+ assertEquals(pid, batch.producerId());
+ assertEquals(epoch, batch.producerEpoch());
+ assertEquals(firstSequence + total, batch.baseSequence());
+ assertEquals(partitionLeaderEpoch, batch.partitionLeaderEpoch());
+ assertEquals(records.length, batch.countOrNull().intValue());
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ assertEquals(records[records.length - 1].timestamp(), batch.maxTimestamp());
+ } else {
+ assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+ assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+ assertEquals(RecordBatch.UNKNOWN_PARTITION_LEADER_EPOCH, batch.partitionLeaderEpoch());
+ assertNull(batch.countOrNull());
+ }
+
+ int recordCount = 0;
+ for (Record record : batch) {
+ assertTrue(record.isValid());
+ assertTrue(record.hasMagic(batch.magic()));
+ assertFalse(record.isCompressed());
+ assertEquals(firstOffset + total, record.offset());
+ assertEquals(records[total].key(), record.key());
+ assertEquals(records[total].value(), record.value());
+
+ if (magic >= RecordBatch.MAGIC_VALUE_V2)
+ assertEquals(firstSequence + total, record.sequence());
+
+ if (magic > RecordBatch.MAGIC_VALUE_V0) {
+ assertEquals(records[total].timestamp(), record.timestamp());
+ if (magic < RecordBatch.MAGIC_VALUE_V2)
+ assertTrue(record.hasTimestampType(batch.timestampType()));
+ }
+
+ total++;
+ recordCount++;
}
- assertFalse(iter.hasNext());
+
+ assertEquals(batch.baseOffset() + recordCount - 1, batch.lastOffset());
}
}
}
@Test
public void testHasRoomForMethod() {
- MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression, TimestampType.CREATE_TIME);
- builder.append(Record.create(magic, 0L, "a".getBytes(), "1".getBytes()));
-
- assertTrue(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+ MemoryRecordsBuilder builder = MemoryRecords.builder(ByteBuffer.allocate(1024), magic, compression,
+ TimestampType.CREATE_TIME, 0L);
+ builder.append(0L, "a".getBytes(), "1".getBytes());
+ assertTrue(builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes()));
builder.close();
- assertFalse(builder.hasRoomFor("b".getBytes(), "2".getBytes()));
+ assertFalse(builder.hasRoomFor(1L, "b".getBytes(), "2".getBytes()));
}
@Test
@@ -135,37 +184,72 @@ public class MemoryRecordsTest {
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
- List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L);
- assertEquals(expectedOffsets.size(), shallowEntries.size());
-
- for (int i = 0; i < expectedOffsets.size(); i++) {
- LogEntry shallowEntry = shallowEntries.get(i);
- assertEquals(expectedOffsets.get(i).longValue(), shallowEntry.offset());
- assertEquals(magic, shallowEntry.record().magic());
- assertEquals(compression, shallowEntry.record().compressionType());
- assertEquals(magic == Record.MAGIC_VALUE_V0 ? TimestampType.NO_TIMESTAMP_TYPE : TimestampType.CREATE_TIME,
- shallowEntry.record().timestampType());
+ List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+ final List<Long> expectedEndOffsets;
+ final List<Long> expectedStartOffsets;
+ final List<Long> expectedMaxTimestamps;
+
+ if (magic < RecordBatch.MAGIC_VALUE_V2 && compression == CompressionType.NONE) {
+ expectedEndOffsets = asList(1L, 4L, 5L, 6L);
+ expectedStartOffsets = asList(1L, 4L, 5L, 6L);
+ expectedMaxTimestamps = asList(11L, 20L, 15L, 16L);
+ } else if (magic < RecordBatch.MAGIC_VALUE_V2) {
+ expectedEndOffsets = asList(1L, 5L, 6L);
+ expectedStartOffsets = asList(1L, 4L, 6L);
+ expectedMaxTimestamps = asList(11L, 20L, 16L);
+ } else {
+ expectedEndOffsets = asList(1L, 5L, 6L);
+ expectedStartOffsets = asList(1L, 3L, 6L);
+ expectedMaxTimestamps = asList(11L, 20L, 16L);
+ }
+
+ assertEquals(expectedEndOffsets.size(), batches.size());
+
+ for (int i = 0; i < expectedEndOffsets.size(); i++) {
+ RecordBatch batch = batches.get(i);
+ assertEquals(expectedStartOffsets.get(i).longValue(), batch.baseOffset());
+ assertEquals(expectedEndOffsets.get(i).longValue(), batch.lastOffset());
+ assertEquals(magic, batch.magic());
+ assertEquals(compression, batch.compressionType());
+ if (magic >= RecordBatch.MAGIC_VALUE_V1) {
+ assertEquals(expectedMaxTimestamps.get(i).longValue(), batch.maxTimestamp());
+ assertEquals(TimestampType.CREATE_TIME, batch.timestampType());
+ } else {
+ assertEquals(RecordBatch.NO_TIMESTAMP, batch.maxTimestamp());
+ assertEquals(TimestampType.NO_TIMESTAMP_TYPE, batch.timestampType());
+ }
}
- List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
- assertEquals(4, deepEntries.size());
+ List<Record> records = TestUtils.toList(filteredRecords.records());
+ assertEquals(4, records.size());
- LogEntry first = deepEntries.get(0);
+ Record first = records.get(0);
assertEquals(1L, first.offset());
- assertEquals(Record.create(magic, 11L, "1".getBytes(), "b".getBytes()), first.record());
+ if (magic > RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(11L, first.timestamp());
+ assertEquals("1", Utils.utf8(first.key(), first.keySize()));
+ assertEquals("b", Utils.utf8(first.value(), first.valueSize()));
- LogEntry second = deepEntries.get(1);
+ Record second = records.get(1);
assertEquals(4L, second.offset());
- assertEquals(Record.create(magic, 20L, "4".getBytes(), "e".getBytes()), second.record());
+ if (magic > RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(20L, second.timestamp());
+ assertEquals("4", Utils.utf8(second.key(), second.keySize()));
+ assertEquals("e", Utils.utf8(second.value(), second.valueSize()));
- LogEntry third = deepEntries.get(2);
+ Record third = records.get(2);
assertEquals(5L, third.offset());
- assertEquals(Record.create(magic, 15L, "5".getBytes(), "f".getBytes()), third.record());
+ if (magic > RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(15L, third.timestamp());
+ assertEquals("5", Utils.utf8(third.key(), third.keySize()));
+ assertEquals("f", Utils.utf8(third.value(), third.valueSize()));
- LogEntry fourth = deepEntries.get(3);
+ Record fourth = records.get(3);
assertEquals(6L, fourth.offset());
- assertEquals(Record.create(magic, 16L, "6".getBytes(), "g".getBytes()), fourth.record());
+ if (magic > RecordBatch.MAGIC_VALUE_V0)
+ assertEquals(16L, fourth.timestamp());
+ assertEquals("6", Utils.utf8(fourth.key(), fourth.keySize()));
+ assertEquals("g", Utils.utf8(fourth.value(), fourth.valueSize()));
}
@Test
@@ -174,16 +258,18 @@ public class MemoryRecordsTest {
ByteBuffer buffer = ByteBuffer.allocate(2048);
MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, magic, compression,
- TimestampType.LOG_APPEND_TIME, 0L, logAppendTime);
+ TimestampType.LOG_APPEND_TIME, 0L, logAppendTime, pid, epoch, firstSequence);
builder.append(10L, null, "a".getBytes());
builder.close();
- builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime);
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 1L, logAppendTime,
+ pid, epoch, firstSequence);
builder.append(11L, "1".getBytes(), "b".getBytes());
builder.append(12L, null, "c".getBytes());
builder.close();
- builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime);
+ builder = MemoryRecords.builder(buffer, magic, compression, TimestampType.LOG_APPEND_TIME, 3L, logAppendTime,
+ pid, epoch, firstSequence);
builder.append(13L, null, "d".getBytes());
builder.append(14L, "4".getBytes(), "e".getBytes());
builder.append(15L, "5".getBytes(), "f".getBytes());
@@ -197,14 +283,14 @@ public class MemoryRecordsTest {
filtered.flip();
MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered);
- List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries());
- assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size());
+ List<MutableRecordBatch> batches = TestUtils.toList(filteredRecords.batches());
+ assertEquals(magic < RecordBatch.MAGIC_VALUE_V2 && compression == CompressionType.NONE ? 3 : 2, batches.size());
- for (LogEntry shallowEntry : shallowEntries) {
- assertEquals(compression, shallowEntry.record().compressionType());
- if (magic > Record.MAGIC_VALUE_V0) {
- assertEquals(TimestampType.LOG_APPEND_TIME, shallowEntry.record().timestampType());
- assertEquals(logAppendTime, shallowEntry.record().timestamp());
+ for (RecordBatch batch : batches) {
+ assertEquals(compression, batch.compressionType());
+ if (magic > RecordBatch.MAGIC_VALUE_V0) {
+ assertEquals(TimestampType.LOG_APPEND_TIME, batch.timestampType());
+ assertEquals(logAppendTime, batch.maxTimestamp());
}
}
}
@@ -213,16 +299,16 @@ public class MemoryRecordsTest {
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (long firstOffset : asList(0L, 57L))
- for (byte magic : asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
+ for (byte magic : asList(RecordBatch.MAGIC_VALUE_V0, RecordBatch.MAGIC_VALUE_V1, RecordBatch.MAGIC_VALUE_V2))
for (CompressionType type: CompressionType.values())
values.add(new Object[] {magic, firstOffset, type});
return values;
}
- private static class RetainNonNullKeysFilter implements MemoryRecords.LogEntryFilter {
+ private static class RetainNonNullKeysFilter implements MemoryRecords.RecordFilter {
@Override
- public boolean shouldRetain(LogEntry entry) {
- return entry.record().hasKey();
+ public boolean shouldRetain(Record record) {
+ return record.hasKey();
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
deleted file mode 100644
index daf8a87..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/RecordTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-import java.nio.ByteBuffer;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.List;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
-
-@RunWith(value = Parameterized.class)
-public class RecordTest {
-
- private final byte magic;
- private final long timestamp;
- private final ByteBuffer key;
- private final ByteBuffer value;
- private final CompressionType compression;
- private final TimestampType timestampType;
- private final Record record;
-
- public RecordTest(byte magic, long timestamp, byte[] key, byte[] value, CompressionType compression) {
- this.magic = magic;
- this.timestamp = timestamp;
- this.timestampType = TimestampType.CREATE_TIME;
- this.key = key == null ? null : ByteBuffer.wrap(key);
- this.value = value == null ? null : ByteBuffer.wrap(value);
- this.compression = compression;
- this.record = Record.create(magic, timestamp, key, value, compression, timestampType);
- }
-
- @Test
- public void testFields() {
- assertEquals(compression, record.compressionType());
- assertEquals(key != null, record.hasKey());
- assertEquals(key, record.key());
- if (key != null)
- assertEquals(key.limit(), record.keySize());
- assertEquals(magic, record.magic());
- assertEquals(value, record.value());
- if (value != null)
- assertEquals(value.limit(), record.valueSize());
- if (magic > 0) {
- assertEquals(timestamp, record.timestamp());
- assertEquals(timestampType, record.timestampType());
- } else {
- assertEquals(Record.NO_TIMESTAMP, record.timestamp());
- assertEquals(TimestampType.NO_TIMESTAMP_TYPE, record.timestampType());
- }
- }
-
- @Test
- public void testChecksum() {
- assertEquals(record.checksum(), record.computeChecksum());
-
- byte attributes = Record.computeAttributes(magic, this.compression, TimestampType.CREATE_TIME);
- assertEquals(record.checksum(), Record.computeChecksum(
- magic,
- attributes,
- this.timestamp,
- this.key == null ? null : this.key.array(),
- this.value == null ? null : this.value.array()
- ));
- assertTrue(record.isValid());
- for (int i = Record.CRC_OFFSET + Record.CRC_LENGTH; i < record.sizeInBytes(); i++) {
- Record copy = copyOf(record);
- copy.buffer().put(i, (byte) 69);
- assertFalse(copy.isValid());
- try {
- copy.ensureValid();
- fail("Should fail the above test.");
- } catch (InvalidRecordException e) {
- // this is good
- }
- }
- }
-
- private Record copyOf(Record record) {
- ByteBuffer buffer = ByteBuffer.allocate(record.sizeInBytes());
- record.buffer().put(buffer);
- buffer.rewind();
- record.buffer().rewind();
- return new Record(buffer);
- }
-
- @Test
- public void testEquality() {
- assertEquals(record, copyOf(record));
- }
-
- @Parameters
- public static Collection<Object[]> data() {
- byte[] payload = new byte[1000];
- Arrays.fill(payload, (byte) 1);
- List<Object[]> values = new ArrayList<>();
- for (byte magic : Arrays.asList(Record.MAGIC_VALUE_V0, Record.MAGIC_VALUE_V1))
- for (long timestamp : Arrays.asList(Record.NO_TIMESTAMP, 0L, 1L))
- for (byte[] key : Arrays.asList(null, "".getBytes(), "key".getBytes(), payload))
- for (byte[] value : Arrays.asList(null, "".getBytes(), "value".getBytes(), payload))
- for (CompressionType compression : CompressionType.values())
- values.add(new Object[] {magic, timestamp, key, value, compression});
- return values;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
new file mode 100644
index 0000000..dd718bf
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/record/SimpleLegacyRecordTest.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import org.apache.kafka.common.utils.ByteBufferOutputStream;
+import org.apache.kafka.common.utils.Utils;
+import org.junit.Test;
+
+import java.io.DataOutputStream;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.fail;
+
+public class SimpleLegacyRecordTest {
+
+ @Test(expected = InvalidRecordException.class)
+ public void testCompressedIterationWithNullValue() throws Exception {
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
+ AbstractLegacyRecordBatch.writeHeader(out, 0L, LegacyRecord.RECORD_OVERHEAD_V1);
+ LegacyRecord.write(out, RecordBatch.MAGIC_VALUE_V1, 1L, (byte[]) null, null,
+ CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+ buffer.flip();
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ for (Record record : records.records())
+ fail("Iteration should have caused invalid record error");
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testCompressedIterationWithEmptyRecords() throws Exception {
+ ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
+ OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue),
+ RecordBatch.MAGIC_VALUE_V1, 64);
+ gzipOutput.close();
+ emptyCompressedValue.flip();
+
+ ByteBuffer buffer = ByteBuffer.allocate(128);
+ DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
+ AbstractLegacyRecordBatch.writeHeader(out, 0L, LegacyRecord.RECORD_OVERHEAD_V1 + emptyCompressedValue.remaining());
+ LegacyRecord.write(out, RecordBatch.MAGIC_VALUE_V1, 1L, null, Utils.toArray(emptyCompressedValue),
+ CompressionType.GZIP, TimestampType.CREATE_TIME);
+
+ buffer.flip();
+
+ MemoryRecords records = MemoryRecords.readableRecords(buffer);
+ for (Record record : records.records())
+ fail("Iteration should have caused invalid record error");
+ }
+
+ /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
+ @Test(expected = InvalidRecordException.class)
+ public void testIsValidWithTooSmallBuffer() {
+ ByteBuffer buffer = ByteBuffer.allocate(2);
+ LegacyRecord record = new LegacyRecord(buffer);
+ assertFalse(record.isValid());
+ record.ensureValid();
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void testIsValidWithChecksumMismatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(4);
+ // set checksum
+ buffer.putInt(2);
+ LegacyRecord record = new LegacyRecord(buffer);
+ assertFalse(record.isValid());
+ record.ensureValid();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java b/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
deleted file mode 100644
index aa77ca4..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/SimpleRecordTest.java
+++ /dev/null
@@ -1,144 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.apache.kafka.common.utils.ByteBufferOutputStream;
-import org.apache.kafka.common.utils.Utils;
-import org.junit.Test;
-
-import java.io.DataOutputStream;
-import java.io.OutputStream;
-import java.nio.ByteBuffer;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-
-public class SimpleRecordTest {
-
- @Test(expected = InvalidRecordException.class)
- public void testCompressedIterationWithNullValue() throws Exception {
- ByteBuffer buffer = ByteBuffer.allocate(128);
- DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
- LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1);
- Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, null, CompressionType.GZIP, TimestampType.CREATE_TIME);
-
- buffer.flip();
-
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
- for (Record record : records.records())
- fail("Iteration should have caused invalid record error");
- }
-
- @Test(expected = InvalidRecordException.class)
- public void testCompressedIterationWithEmptyRecords() throws Exception {
- ByteBuffer emptyCompressedValue = ByteBuffer.allocate(64);
- OutputStream gzipOutput = CompressionType.GZIP.wrapForOutput(new ByteBufferOutputStream(emptyCompressedValue),
- Record.MAGIC_VALUE_V1, 64);
- gzipOutput.close();
- emptyCompressedValue.flip();
-
- ByteBuffer buffer = ByteBuffer.allocate(128);
- DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(buffer));
- LogEntry.writeHeader(out, 0L, Record.RECORD_OVERHEAD_V1 + emptyCompressedValue.remaining());
- Record.write(out, Record.CURRENT_MAGIC_VALUE, 1L, null, Utils.toArray(emptyCompressedValue),
- CompressionType.GZIP, TimestampType.CREATE_TIME);
-
- buffer.flip();
-
- MemoryRecords records = MemoryRecords.readableRecords(buffer);
- for (Record record : records.records())
- fail("Iteration should have caused invalid record error");
- }
-
- /* This scenario can happen if the record size field is corrupt and we end up allocating a buffer that is too small */
- @Test(expected = InvalidRecordException.class)
- public void testIsValidWithTooSmallBuffer() {
- ByteBuffer buffer = ByteBuffer.allocate(2);
- Record record = new Record(buffer);
- assertFalse(record.isValid());
- record.ensureValid();
- }
-
- @Test(expected = InvalidRecordException.class)
- public void testIsValidWithChecksumMismatch() {
- ByteBuffer buffer = ByteBuffer.allocate(4);
- // set checksum
- buffer.putInt(2);
- Record record = new Record(buffer);
- assertFalse(record.isValid());
- record.ensureValid();
- }
-
- @Test
- public void testIsValidWithFourBytesBuffer() {
- ByteBuffer buffer = ByteBuffer.allocate(4);
- Record record = new Record(buffer);
- // it is a bit weird that we return `true` in this case, we could extend the definition of `isValid` to
- // something like the following to detect a clearly corrupt record:
- // return size() >= recordSize(0, 0) && checksum() == computeChecksum();
- assertTrue(record.isValid());
- // no exception should be thrown
- record.ensureValid();
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void cannotUpconvertWithNoTimestampType() {
- Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, "foo".getBytes(), "bar".getBytes());
- record.convert(Record.MAGIC_VALUE_V1, TimestampType.NO_TIMESTAMP_TYPE);
- }
-
- @Test
- public void testConvertFromV0ToV1() {
- byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
- byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
-
- for (int i = 0; i < keys.length; i++) {
- Record record = Record.create(Record.MAGIC_VALUE_V0, Record.NO_TIMESTAMP, keys[i], values[i]);
- Record converted = record.convert(Record.MAGIC_VALUE_V1, TimestampType.CREATE_TIME);
-
- assertEquals(Record.MAGIC_VALUE_V1, converted.magic());
- assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
- assertEquals(TimestampType.CREATE_TIME, converted.timestampType());
- assertEquals(record.key(), converted.key());
- assertEquals(record.value(), converted.value());
- assertTrue(record.isValid());
- assertEquals(record.convertedSize(Record.MAGIC_VALUE_V1), converted.sizeInBytes());
- }
- }
-
- @Test
- public void testConvertFromV1ToV0() {
- byte[][] keys = new byte[][] {"a".getBytes(), "".getBytes(), null, "b".getBytes()};
- byte[][] values = new byte[][] {"1".getBytes(), "".getBytes(), "2".getBytes(), null};
-
- for (int i = 0; i < keys.length; i++) {
- Record record = Record.create(Record.MAGIC_VALUE_V1, System.currentTimeMillis(), keys[i], values[i]);
- Record converted = record.convert(Record.MAGIC_VALUE_V0, TimestampType.NO_TIMESTAMP_TYPE);
-
- assertEquals(Record.MAGIC_VALUE_V0, converted.magic());
- assertEquals(Record.NO_TIMESTAMP, converted.timestamp());
- assertEquals(TimestampType.NO_TIMESTAMP_TYPE, converted.timestampType());
- assertEquals(record.key(), converted.key());
- assertEquals(record.value(), converted.value());
- assertTrue(record.isValid());
- assertEquals(record.convertedSize(Record.MAGIC_VALUE_V0), converted.sizeInBytes());
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java b/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
deleted file mode 100644
index c262758..0000000
--- a/clients/src/test/java/org/apache/kafka/common/record/TimestampTypeTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.common.record;
-
-import org.junit.Test;
-
-import static org.junit.Assert.assertEquals;
-
-public class TimestampTypeTest {
-
- @Test
- public void toAndFromAttributesCreateTime() {
- byte attributes = TimestampType.CREATE_TIME.updateAttributes((byte) 0);
- assertEquals(TimestampType.CREATE_TIME, TimestampType.forAttributes(attributes));
- }
-
- @Test
- public void toAndFromAttributesLogAppendTime() {
- byte attributes = TimestampType.LOG_APPEND_TIME.updateAttributes((byte) 0);
- assertEquals(TimestampType.LOG_APPEND_TIME, TimestampType.forAttributes(attributes));
- }
-
- @Test(expected = IllegalArgumentException.class)
- public void updateAttributesNotAllowedForNoTimestampType() {
- TimestampType.NO_TIMESTAMP_TYPE.updateAttributes((byte) 0);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
index 64bfdf5..b9fbf06 100644
--- a/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
@@ -28,8 +28,13 @@ import org.apache.kafka.common.protocol.ApiKeys;
import org.apache.kafka.common.protocol.Errors;
import org.apache.kafka.common.protocol.SecurityProtocol;
import org.apache.kafka.common.protocol.types.Struct;
+import org.apache.kafka.common.record.CompressionType;
+import org.apache.kafka.common.record.InvalidRecordException;
+import org.apache.kafka.common.record.SimpleRecord;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.MemoryRecords;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.MemoryRecordsBuilder;
+import org.apache.kafka.common.record.TimestampType;
import org.junit.Test;
import java.io.IOException;
@@ -63,9 +68,9 @@ public class RequestResponseTest {
checkRequest(createControlledShutdownRequest());
checkResponse(createControlledShutdownResponse(), 1);
checkErrorResponse(createControlledShutdownRequest(), new UnknownServerException());
- checkRequest(createFetchRequest(3));
- checkErrorResponse(createFetchRequest(3), new UnknownServerException());
- checkResponse(createFetchResponse(), 0);
+ checkRequest(createFetchRequest(4));
+ checkResponse(createFetchResponse(), 4);
+ checkErrorResponse(createFetchRequest(4), new UnknownServerException());
checkRequest(createHeartBeatRequest());
checkErrorResponse(createHeartBeatRequest(), new UnknownServerException());
checkResponse(createHeartBeatResponse(), 0);
@@ -103,8 +108,10 @@ public class RequestResponseTest {
checkErrorResponse(createOffsetFetchRequest(1), new UnknownServerException());
checkErrorResponse(createOffsetFetchRequest(2), new UnknownServerException());
checkResponse(createOffsetFetchResponse(), 0);
- checkRequest(createProduceRequest());
- checkErrorResponse(createProduceRequest(), new UnknownServerException());
+ checkRequest(createProduceRequest(2));
+ checkErrorResponse(createProduceRequest(2), new UnknownServerException());
+ checkRequest(createProduceRequest(3));
+ checkErrorResponse(createProduceRequest(3), new UnknownServerException());
checkResponse(createProduceResponse(), 2);
checkRequest(createStopReplicaRequest(true));
checkRequest(createStopReplicaRequest(false));
@@ -178,6 +185,7 @@ public class RequestResponseTest {
for (int i = 0; i < latestVersion; ++i) {
checkErrorResponse(createFetchRequest(i), new UnknownServerException());
checkRequest(createFetchRequest(i));
+ checkResponse(createFetchResponse(), i);
}
}
@@ -216,7 +224,7 @@ public class RequestResponseTest {
@Test
public void produceRequestToStringTest() {
- ProduceRequest request = createProduceRequest();
+ ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
assertEquals(1, request.partitionRecordsOrFail().size());
assertFalse(request.toString(false).contains("partitionSizes"));
assertTrue(request.toString(false).contains("numPartitions=1"));
@@ -240,7 +248,7 @@ public class RequestResponseTest {
@Test
public void produceRequestGetErrorResponseTest() {
- ProduceRequest request = createProduceRequest();
+ ProduceRequest request = createProduceRequest(ApiKeys.PRODUCE.latestVersion());
Set<TopicPartition> partitions = new HashSet<>(request.partitionRecordsOrFail().keySet());
ProduceResponse errorResponse = (ProduceResponse) request.getErrorResponse(new NotEnoughReplicasException());
@@ -248,7 +256,7 @@ public class RequestResponseTest {
ProduceResponse.PartitionResponse partitionResponse = errorResponse.responses().values().iterator().next();
assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
- assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+ assertEquals(RecordBatch.NO_TIMESTAMP, partitionResponse.logAppendTime);
request.clearPartitionRecords();
@@ -258,14 +266,14 @@ public class RequestResponseTest {
partitionResponse = errorResponse.responses().values().iterator().next();
assertEquals(Errors.NOT_ENOUGH_REPLICAS, partitionResponse.error);
assertEquals(ProduceResponse.INVALID_OFFSET, partitionResponse.baseOffset);
- assertEquals(Record.NO_TIMESTAMP, partitionResponse.logAppendTime);
+ assertEquals(RecordBatch.NO_TIMESTAMP, partitionResponse.logAppendTime);
}
@Test
public void produceResponseVersionTest() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
- 10000, Record.NO_TIMESTAMP));
+ 10000, RecordBatch.NO_TIMESTAMP));
ProduceResponse v0Response = new ProduceResponse(responseData);
ProduceResponse v1Response = new ProduceResponse(responseData, 10);
ProduceResponse v2Response = new ProduceResponse(responseData, 10);
@@ -283,12 +291,63 @@ public class RequestResponseTest {
assertEquals("Response data does not match", responseData, v2Response.responses());
}
+ @Test(expected = InvalidRecordException.class)
+ public void produceRequestV3ShouldContainOnlyOneRecordBatch() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+ builder.close();
+
+ builder = MemoryRecords.builder(buffer, CompressionType.NONE, TimestampType.CREATE_TIME, 1L);
+ builder.append(11L, "1".getBytes(), "b".getBytes());
+ builder.append(12L, null, "c".getBytes());
+ builder.close();
+
+ buffer.flip();
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(buffer));
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void produceRequestV3CannotHaveNoRecordBatches() {
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), MemoryRecords.EMPTY);
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void produceRequestV3CannotUseMagicV0() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, CompressionType.NONE,
+ TimestampType.NO_TIMESTAMP_TYPE, 0L);
+ builder.append(10L, null, "a".getBytes());
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+ }
+
+ @Test(expected = InvalidRecordException.class)
+ public void produceRequestV3CannotUseMagicV1() {
+ ByteBuffer buffer = ByteBuffer.allocate(256);
+ MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, CompressionType.NONE,
+ TimestampType.CREATE_TIME, 0L);
+ builder.append(10L, null, "a".getBytes());
+
+ Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
+ produceData.put(new TopicPartition("test", 0), builder.build());
+ new ProduceRequest.Builder(RecordBatch.CURRENT_MAGIC_VALUE, (short) 1, 5000, produceData).build().toStruct();
+ }
+
@Test
public void fetchResponseVersionTest() {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
- responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000,
+ FetchResponse.INVALID_LSO, null, records));
FetchResponse v0Response = new FetchResponse(responseData, 0);
FetchResponse v1Response = new FetchResponse(responseData, 10);
@@ -383,8 +442,15 @@ public class RequestResponseTest {
private FetchResponse createFetchResponse() {
LinkedHashMap<TopicPartition, FetchResponse.PartitionData> responseData = new LinkedHashMap<>();
- MemoryRecords records = MemoryRecords.readableRecords(ByteBuffer.allocate(10));
- responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE, 1000000, records));
+ MemoryRecords records = MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("blah".getBytes()));
+ responseData.put(new TopicPartition("test", 0), new FetchResponse.PartitionData(Errors.NONE,
+ 1000000, FetchResponse.INVALID_LSO, null, records));
+
+ List<FetchResponse.AbortedTransaction> abortedTransactions = Collections.singletonList(
+ new FetchResponse.AbortedTransaction(234L, 999L));
+ responseData.put(new TopicPartition("test", 1), new FetchResponse.PartitionData(Errors.NONE,
+ 1000000, FetchResponse.INVALID_LSO, abortedTransactions, MemoryRecords.EMPTY));
+
return new FetchResponse(responseData, 25);
}
@@ -529,16 +595,20 @@ public class RequestResponseTest {
return new OffsetFetchResponse(Errors.NONE, responseData);
}
- private ProduceRequest createProduceRequest() {
- Map<TopicPartition, MemoryRecords> produceData = new HashMap<>();
- produceData.put(new TopicPartition("test", 0), MemoryRecords.readableRecords(ByteBuffer.allocate(10)));
- return new ProduceRequest.Builder((short) 1, 5000, produceData).build();
+ private ProduceRequest createProduceRequest(int version) {
+ if (version < 2)
+ throw new IllegalArgumentException("Produce request version 2 is not supported");
+
+ byte magic = version == 2 ? RecordBatch.MAGIC_VALUE_V1 : RecordBatch.MAGIC_VALUE_V2;
+ MemoryRecords records = MemoryRecords.withRecords(magic, CompressionType.NONE, new SimpleRecord("woot".getBytes()));
+ Map<TopicPartition, MemoryRecords> produceData = Collections.singletonMap(new TopicPartition("test", 0), records);
+ return new ProduceRequest.Builder(magic, (short) 1, 5000, produceData).build((short) version);
}
private ProduceResponse createProduceResponse() {
Map<TopicPartition, ProduceResponse.PartitionResponse> responseData = new HashMap<>();
responseData.put(new TopicPartition("test", 0), new ProduceResponse.PartitionResponse(Errors.NONE,
- 10000, Record.NO_TIMESTAMP));
+ 10000, RecordBatch.NO_TIMESTAMP));
return new ProduceResponse(responseData, 0);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java b/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
new file mode 100644
index 0000000..adb5da7
--- /dev/null
+++ b/clients/src/test/java/org/apache/kafka/common/utils/Crc32Test.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.utils;
+
+import org.junit.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.Assert.assertEquals;
+
+public class Crc32Test {
+
+ @Test
+ public void testUpdateByteBuffer() {
+ byte[] bytes = new byte[]{0, 1, 2, 3, 4, 5};
+ doTestUpdateByteBuffer(bytes, ByteBuffer.allocate(bytes.length));
+ doTestUpdateByteBuffer(bytes, ByteBuffer.allocateDirect(bytes.length));
+ }
+
+ private void doTestUpdateByteBuffer(byte[] bytes, ByteBuffer buffer) {
+ buffer.put(bytes);
+ buffer.flip();
+ Crc32 bufferCrc = new Crc32();
+ bufferCrc.update(buffer, buffer.remaining());
+ assertEquals(Crc32.crc32(bytes), bufferCrc.getValue());
+ assertEquals(0, buffer.position());
+ }
+
+ @Test
+ public void testUpdateByteBufferWithOffsetPosition() {
+ byte[] bytes = new byte[]{-2, -1, 0, 1, 2, 3, 4, 5};
+ doTestUpdateByteBufferWithOffsetPosition(bytes, ByteBuffer.allocate(bytes.length), 2);
+ doTestUpdateByteBufferWithOffsetPosition(bytes, ByteBuffer.allocateDirect(bytes.length), 2);
+ }
+
+ private void doTestUpdateByteBufferWithOffsetPosition(byte[] bytes, ByteBuffer buffer, int offset) {
+ buffer.put(bytes);
+ buffer.flip();
+ buffer.position(offset);
+
+ Crc32 bufferCrc = new Crc32();
+ bufferCrc.update(buffer, buffer.remaining());
+ assertEquals(Crc32.crc32(bytes, offset, buffer.remaining()), bufferCrc.getValue());
+ assertEquals(offset, buffer.position());
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 7672335..16742d5 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -22,10 +22,12 @@ import org.easymock.IAnswer;
import org.junit.Test;
import java.io.Closeable;
+import java.io.DataOutputStream;
import java.io.EOFException;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
+import java.nio.charset.StandardCharsets;
import java.nio.file.StandardOpenOption;
import java.util.Arrays;
import java.util.Collections;
@@ -34,6 +36,7 @@ import java.util.Random;
import static org.apache.kafka.common.utils.Utils.formatAddress;
import static org.apache.kafka.common.utils.Utils.getHost;
import static org.apache.kafka.common.utils.Utils.getPort;
+import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -88,6 +91,95 @@ public class UtilsTest {
assertEquals(1, Utils.abs(-1));
}
+ @Test
+ public void writeToBuffer() throws IOException {
+ byte[] input = {0, 1, 2, 3, 4, 5};
+ ByteBuffer source = ByteBuffer.wrap(input);
+
+ doTestWriteToByteBuffer(source, ByteBuffer.allocate(input.length));
+ doTestWriteToByteBuffer(source, ByteBuffer.allocateDirect(input.length));
+ assertEquals(0, source.position());
+
+ source.position(2);
+ doTestWriteToByteBuffer(source, ByteBuffer.allocate(input.length));
+ doTestWriteToByteBuffer(source, ByteBuffer.allocateDirect(input.length));
+ }
+
+ private void doTestWriteToByteBuffer(ByteBuffer source, ByteBuffer dest) throws IOException {
+ int numBytes = source.remaining();
+ int position = source.position();
+ DataOutputStream out = new DataOutputStream(new ByteBufferOutputStream(dest));
+ Utils.writeTo(out, source, source.remaining());
+ dest.flip();
+ assertEquals(numBytes, dest.remaining());
+ assertEquals(position, source.position());
+ assertEquals(source, dest);
+ }
+
+ @Test
+ public void toArray() {
+ byte[] input = {0, 1, 2, 3, 4};
+ ByteBuffer buffer = ByteBuffer.wrap(input);
+ assertArrayEquals(input, Utils.toArray(buffer));
+ assertEquals(0, buffer.position());
+
+ assertArrayEquals(new byte[] {1, 2}, Utils.toArray(buffer, 1, 2));
+ assertEquals(0, buffer.position());
+
+ buffer.position(2);
+ assertArrayEquals(new byte[] {2, 3, 4}, Utils.toArray(buffer));
+ assertEquals(2, buffer.position());
+ }
+
+ @Test
+ public void toArrayDirectByteBuffer() {
+ byte[] input = {0, 1, 2, 3, 4};
+ ByteBuffer buffer = ByteBuffer.allocateDirect(5);
+ buffer.put(input);
+ buffer.rewind();
+
+ assertArrayEquals(input, Utils.toArray(buffer));
+ assertEquals(0, buffer.position());
+
+ assertArrayEquals(new byte[] {1, 2}, Utils.toArray(buffer, 1, 2));
+ assertEquals(0, buffer.position());
+
+ buffer.position(2);
+ assertArrayEquals(new byte[] {2, 3, 4}, Utils.toArray(buffer));
+ assertEquals(2, buffer.position());
+ }
+
+ @Test
+ public void utf8ByteArraySerde() {
+ String utf8String = "A\u00ea\u00f1\u00fcC";
+ byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
+ assertArrayEquals(utf8Bytes, Utils.utf8(utf8String));
+ assertEquals(utf8Bytes.length, Utils.utf8Length(utf8String));
+ assertEquals(utf8String, Utils.utf8(utf8Bytes));
+ }
+
+ @Test
+ public void utf8ByteBufferSerde() {
+ doTestUtf8ByteBuffer(ByteBuffer.allocate(20));
+ doTestUtf8ByteBuffer(ByteBuffer.allocateDirect(20));
+ }
+
+ private void doTestUtf8ByteBuffer(ByteBuffer utf8Buffer) {
+ String utf8String = "A\u00ea\u00f1\u00fcC";
+ byte[] utf8Bytes = utf8String.getBytes(StandardCharsets.UTF_8);
+
+ utf8Buffer.position(4);
+ utf8Buffer.put(utf8Bytes);
+
+ utf8Buffer.position(4);
+ assertEquals(utf8String, Utils.utf8(utf8Buffer, utf8Bytes.length));
+ assertEquals(4, utf8Buffer.position());
+
+ utf8Buffer.position(0);
+ assertEquals(utf8String, Utils.utf8(utf8Buffer, 4, utf8Bytes.length));
+ assertEquals(0, utf8Buffer.position());
+ }
+
private void subTest(ByteBuffer buffer) {
// The first byte should be 'A'
assertEquals('A', (Utils.readBytes(buffer, 0, 1))[0]);
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
index 975d423..5ab8c9c 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/WorkerGroupMember.java
@@ -16,6 +16,7 @@
*/
package org.apache.kafka.connect.runtime.distributed;
+import org.apache.kafka.clients.ApiVersions;
import org.apache.kafka.clients.ClientUtils;
import org.apache.kafka.clients.CommonClientConfigs;
import org.apache.kafka.clients.Metadata;
@@ -101,7 +102,8 @@ public class WorkerGroupMember {
config.getInt(CommonClientConfigs.RECEIVE_BUFFER_CONFIG),
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG),
time,
- true);
+ true,
+ new ApiVersions());
this.client = new ConsumerNetworkClient(netClient, metadata, time, retryBackoffMs,
config.getInt(CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG));
this.coordinator = new WorkerCoordinator(this.client,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
index dabb347..913ae1f 100644
--- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
+++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/ConnectUtils.java
@@ -17,13 +17,13 @@
package org.apache.kafka.connect.util;
import org.apache.kafka.common.record.InvalidRecordException;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
public final class ConnectUtils {
public static Long checkAndConvertTimestamp(Long timestamp) {
if (timestamp == null || timestamp >= 0)
return timestamp;
- else if (timestamp == Record.NO_TIMESTAMP)
+ else if (timestamp == RecordBatch.NO_TIMESTAMP)
return null;
else
throw new InvalidRecordException(String.format("Invalid record timestamp %d", timestamp));
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
----------------------------------------------------------------------
diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
index 47ce1fc..26ac486 100644
--- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
+++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
@@ -24,7 +24,7 @@ import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.clients.consumer.OffsetCommitCallback;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.WakeupException;
-import org.apache.kafka.common.record.Record;
+import org.apache.kafka.common.record.RecordBatch;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.connect.data.Schema;
@@ -533,7 +533,7 @@ public class WorkerSinkTaskTest {
@Test
public void testMissingTimestampPropagation() throws Exception {
expectInitializeTask();
- expectConsumerPoll(1, Record.NO_TIMESTAMP, TimestampType.CREATE_TIME);
+ expectConsumerPoll(1, RecordBatch.NO_TIMESTAMP, TimestampType.CREATE_TIME);
expectConversionAndTransformation(1);
Capture<Collection<SinkRecord>> records = EasyMock.newCapture(CaptureType.ALL);
@@ -662,7 +662,7 @@ public class WorkerSinkTaskTest {
}
private void expectConsumerPoll(final int numMessages) {
- expectConsumerPoll(numMessages, Record.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
+ expectConsumerPoll(numMessages, RecordBatch.NO_TIMESTAMP, TimestampType.NO_TIMESTAMP_TYPE);
}
private void expectConsumerPoll(final int numMessages, final long timestamp, final TimestampType timestampType) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/admin/AdminClient.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/admin/AdminClient.scala b/core/src/main/scala/kafka/admin/AdminClient.scala
index 4b28460..45ba58b 100644
--- a/core/src/main/scala/kafka/admin/AdminClient.scala
+++ b/core/src/main/scala/kafka/admin/AdminClient.scala
@@ -293,7 +293,8 @@ object AdminClient {
DefaultReceiveBufferBytes,
DefaultRequestTimeoutMs,
time,
- true)
+ true,
+ new ApiVersions)
val highLevelClient = new ConsumerNetworkClient(
networkClient,
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/api/ApiVersion.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/ApiVersion.scala b/core/src/main/scala/kafka/api/ApiVersion.scala
index 730f313..2ed6452 100644
--- a/core/src/main/scala/kafka/api/ApiVersion.scala
+++ b/core/src/main/scala/kafka/api/ApiVersion.scala
@@ -17,7 +17,7 @@
package kafka.api
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.RecordBatch
/**
* This class contains the different Kafka versions.
@@ -62,7 +62,10 @@ object ApiVersion {
"0.10.1" -> KAFKA_0_10_1_IV2,
// introduced UpdateMetadataRequest v3 in KIP-103
"0.10.2-IV0" -> KAFKA_0_10_2_IV0,
- "0.10.2" -> KAFKA_0_10_2_IV0
+ "0.10.2" -> KAFKA_0_10_2_IV0,
+ // KIP-98 (idempotent and transactional producer support)
+ "0.11.0-IV0" -> KAFKA_0_11_0_IV0,
+ "0.11.0" -> KAFKA_0_11_0_IV0
)
private val versionPattern = "\\.".r
@@ -89,60 +92,66 @@ sealed trait ApiVersion extends Ordered[ApiVersion] {
// Keep the IDs in order of versions
case object KAFKA_0_8_0 extends ApiVersion {
val version: String = "0.8.0.X"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val id: Int = 0
}
case object KAFKA_0_8_1 extends ApiVersion {
val version: String = "0.8.1.X"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val id: Int = 1
}
case object KAFKA_0_8_2 extends ApiVersion {
val version: String = "0.8.2.X"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val id: Int = 2
}
case object KAFKA_0_9_0 extends ApiVersion {
val version: String = "0.9.0.X"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V0
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V0
val id: Int = 3
}
case object KAFKA_0_10_0_IV0 extends ApiVersion {
val version: String = "0.10.0-IV0"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 4
}
case object KAFKA_0_10_0_IV1 extends ApiVersion {
val version: String = "0.10.0-IV1"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 5
}
case object KAFKA_0_10_1_IV0 extends ApiVersion {
val version: String = "0.10.1-IV0"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 6
}
case object KAFKA_0_10_1_IV1 extends ApiVersion {
val version: String = "0.10.1-IV1"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 7
}
case object KAFKA_0_10_1_IV2 extends ApiVersion {
val version: String = "0.10.1-IV2"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 8
}
case object KAFKA_0_10_2_IV0 extends ApiVersion {
val version: String = "0.10.2-IV0"
- val messageFormatVersion: Byte = Record.MAGIC_VALUE_V1
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V1
val id: Int = 9
}
+
+case object KAFKA_0_11_0_IV0 extends ApiVersion {
+ val version: String = "0.11.0-IV0"
+ val messageFormatVersion: Byte = RecordBatch.MAGIC_VALUE_V2
+ val id: Int = 10
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/5bd06f1d/core/src/main/scala/kafka/api/FetchRequest.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/api/FetchRequest.scala b/core/src/main/scala/kafka/api/FetchRequest.scala
index f049821..f91a3c3 100644
--- a/core/src/main/scala/kafka/api/FetchRequest.scala
+++ b/core/src/main/scala/kafka/api/FetchRequest.scala
@@ -203,7 +203,8 @@ case class FetchRequest(versionId: Short = FetchRequest.CurrentVersion,
val responseData = new util.LinkedHashMap[TopicPartition, JFetchResponse.PartitionData]
requestInfo.foreach { case (TopicAndPartition(topic, partition), _) =>
responseData.put(new TopicPartition(topic, partition),
- new JFetchResponse.PartitionData(Errors.forException(e), -1, MemoryRecords.EMPTY))
+ new JFetchResponse.PartitionData(Errors.forException(e), JFetchResponse.INVALID_HIGHWATERMARK,
+ JFetchResponse.INVALID_LSO, null, MemoryRecords.EMPTY))
}
val errorResponse = new JFetchResponse(responseData, 0)
// Magic value does not matter here because the message set is empty