You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by jg...@apache.org on 2017/05/31 21:14:19 UTC
kafka git commit: KAFKA-5093;
Avoid loading full batch data when possible when iterating FileRecords
Repository: kafka
Updated Branches:
refs/heads/trunk da9a171c9 -> 81f0c1e8f
KAFKA-5093; Avoid loading full batch data when possible when iterating FileRecords
Author: Jason Gustafson <ja...@confluent.io>
Reviewers: Ismael Juma <is...@juma.me.uk>
Closes #3160 from hachikuji/KAFKA-5093
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/81f0c1e8
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/81f0c1e8
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/81f0c1e8
Branch: refs/heads/trunk
Commit: 81f0c1e8f2ba2d86f061361b5ee33bb8e6f640c5
Parents: da9a171
Author: Jason Gustafson <ja...@confluent.io>
Authored: Wed May 31 14:11:47 2017 -0700
Committer: Jason Gustafson <ja...@confluent.io>
Committed: Wed May 31 14:11:47 2017 -0700
----------------------------------------------------------------------
.../record/AbstractLegacyRecordBatch.java | 73 ++++++
.../kafka/common/record/DefaultRecordBatch.java | 77 +++++++
.../kafka/common/record/FileLogInputStream.java | 216 +++++++-----------
.../apache/kafka/common/record/FileRecords.java | 4 +-
.../kafka/common/record/LegacyRecord.java | 43 ++--
.../kafka/common/record/MemoryRecords.java | 11 +-
.../org/apache/kafka/common/record/Records.java | 2 +
.../clients/consumer/internals/FetcherTest.java | 2 +-
.../common/record/FileLogInputStreamTest.java | 223 ++++++++++++++++++-
.../kafka/common/record/FileRecordsTest.java | 6 +-
.../common/record/MemoryRecordsBuilderTest.java | 2 +-
.../kafka/message/ByteBufferMessageSet.scala | 4 +-
.../scala/kafka/message/MessageAndOffset.scala | 24 +-
13 files changed, 504 insertions(+), 183 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
index e4938be..9b74d06 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractLegacyRecordBatch.java
@@ -29,6 +29,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayDeque;
import java.util.NoSuchElementException;
@@ -510,4 +511,76 @@ public abstract class AbstractLegacyRecordBatch extends AbstractRecordBatch impl
}
}
+ static class LegacyFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {
+
+ LegacyFileChannelRecordBatch(long offset,
+ byte magic,
+ FileChannel channel,
+ int position,
+ int batchSize) {
+ super(offset, magic, channel, position, batchSize);
+ }
+
+ @Override
+ protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
+ return new ByteBufferLegacyRecordBatch(buffer);
+ }
+
+ @Override
+ public long baseOffset() {
+ return loadFullBatch().baseOffset();
+ }
+
+ @Override
+ public long lastOffset() {
+ return offset;
+ }
+
+ @Override
+ public long producerId() {
+ return RecordBatch.NO_PRODUCER_ID;
+ }
+
+ @Override
+ public short producerEpoch() {
+ return RecordBatch.NO_PRODUCER_EPOCH;
+ }
+
+ @Override
+ public int baseSequence() {
+ return RecordBatch.NO_SEQUENCE;
+ }
+
+ @Override
+ public int lastSequence() {
+ return RecordBatch.NO_SEQUENCE;
+ }
+
+ @Override
+ public Integer countOrNull() {
+ return null;
+ }
+
+ @Override
+ public boolean isTransactional() {
+ return false;
+ }
+
+ @Override
+ public boolean isControlBatch() {
+ return false;
+ }
+
+ @Override
+ public int partitionLeaderEpoch() {
+ return RecordBatch.NO_PARTITION_LEADER_EPOCH;
+ }
+
+ @Override
+ protected int headerSize() {
+ return LOG_OVERHEAD + LegacyRecord.headerSize(magic);
+ }
+
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
index 2bf889f..bdba860 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/DefaultRecordBatch.java
@@ -26,6 +26,7 @@ import org.apache.kafka.common.utils.Crc32C;
import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
@@ -498,4 +499,80 @@ public class DefaultRecordBatch extends AbstractRecordBatch implements MutableRe
}
+ static class DefaultFileChannelRecordBatch extends FileLogInputStream.FileChannelRecordBatch {
+
+ DefaultFileChannelRecordBatch(long offset,
+ byte magic,
+ FileChannel channel,
+ int position,
+ int batchSize) {
+ super(offset, magic, channel, position, batchSize);
+ }
+
+ @Override
+ protected RecordBatch toMemoryRecordBatch(ByteBuffer buffer) {
+ return new DefaultRecordBatch(buffer);
+ }
+
+ @Override
+ public long baseOffset() {
+ return offset;
+ }
+
+ @Override
+ public long lastOffset() {
+ return loadBatchHeader().lastOffset();
+ }
+
+ @Override
+ public long producerId() {
+ return loadBatchHeader().producerId();
+ }
+
+ @Override
+ public short producerEpoch() {
+ return loadBatchHeader().producerEpoch();
+ }
+
+ @Override
+ public int baseSequence() {
+ return loadBatchHeader().baseSequence();
+ }
+
+ @Override
+ public int lastSequence() {
+ return loadBatchHeader().lastSequence();
+ }
+
+ @Override
+ public long checksum() {
+ return loadBatchHeader().checksum();
+ }
+
+ @Override
+ public Integer countOrNull() {
+ return loadBatchHeader().countOrNull();
+ }
+
+ @Override
+ public boolean isTransactional() {
+ return loadBatchHeader().isTransactional();
+ }
+
+ @Override
+ public boolean isControlBatch() {
+ return loadBatchHeader().isControlBatch();
+ }
+
+ @Override
+ public int partitionLeaderEpoch() {
+ return loadBatchHeader().partitionLeaderEpoch();
+ }
+
+ @Override
+ protected int headerSize() {
+ return RECORD_BATCH_OVERHEAD;
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
index 57fec4f..75eb1b3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileLogInputStream.java
@@ -18,6 +18,8 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
import org.apache.kafka.common.errors.CorruptRecordException;
+import org.apache.kafka.common.record.AbstractLegacyRecordBatch.LegacyFileChannelRecordBatch;
+import org.apache.kafka.common.record.DefaultRecordBatch.DefaultFileChannelRecordBatch;
import org.apache.kafka.common.utils.CloseableIterator;
import org.apache.kafka.common.utils.Utils;
@@ -27,6 +29,10 @@ import java.nio.channels.FileChannel;
import java.util.Iterator;
import static org.apache.kafka.common.record.Records.LOG_OVERHEAD;
+import static org.apache.kafka.common.record.Records.HEADER_SIZE_UP_TO_MAGIC;
+import static org.apache.kafka.common.record.Records.MAGIC_OFFSET;
+import static org.apache.kafka.common.record.Records.OFFSET_OFFSET;
+import static org.apache.kafka.common.record.Records.SIZE_OFFSET;
/**
* A log input stream which is backed by a {@link FileChannel}.
@@ -35,7 +41,7 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
private int position;
private final int end;
private final FileChannel channel;
- private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(LOG_OVERHEAD);
+ private final ByteBuffer logHeaderBuffer = ByteBuffer.allocate(HEADER_SIZE_UP_TO_MAGIC);
/**
* Create a new log input stream over the FileChannel
@@ -53,15 +59,15 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public FileChannelRecordBatch nextBatch() throws IOException {
- if (position + LOG_OVERHEAD >= end)
+ if (position + HEADER_SIZE_UP_TO_MAGIC >= end)
return null;
logHeaderBuffer.rewind();
Utils.readFullyOrFail(channel, logHeaderBuffer, position, "log header");
logHeaderBuffer.rewind();
- long offset = logHeaderBuffer.getLong();
- int size = logHeaderBuffer.getInt();
+ long offset = logHeaderBuffer.getLong(OFFSET_OFFSET);
+ int size = logHeaderBuffer.getInt(SIZE_OFFSET);
// V0 has the smallest overhead, stricter checking is done later
if (size < LegacyRecord.RECORD_OVERHEAD_V0)
@@ -70,7 +76,14 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
if (position + LOG_OVERHEAD + size > end)
return null;
- FileChannelRecordBatch batch = new FileChannelRecordBatch(offset, channel, position, size);
+ byte magic = logHeaderBuffer.get(MAGIC_OFFSET);
+ final FileChannelRecordBatch batch;
+
+ if (magic < RecordBatch.MAGIC_VALUE_V2)
+ batch = new LegacyFileChannelRecordBatch(offset, magic, channel, position, size);
+ else
+ batch = new DefaultFileChannelRecordBatch(offset, magic, channel, position, size);
+
position += batch.sizeInBytes();
return batch;
}
@@ -80,71 +93,46 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
* without needing to read the record data into memory until it is needed. The downside
* is that entries will generally no longer be readable when the underlying channel is closed.
*/
- public static class FileChannelRecordBatch extends AbstractRecordBatch {
- private final long offset;
- private final FileChannel channel;
- private final int position;
- private final int batchSize;
- private RecordBatch underlying;
- private Byte magic;
-
- private FileChannelRecordBatch(long offset,
- FileChannel channel,
- int position,
- int batchSize) {
+ public abstract static class FileChannelRecordBatch extends AbstractRecordBatch {
+ protected final long offset;
+ protected final byte magic;
+ protected final FileChannel channel;
+ protected final int position;
+ protected final int batchSize;
+
+ private RecordBatch fullBatch;
+ private RecordBatch batchHeader;
+
+ FileChannelRecordBatch(long offset,
+ byte magic,
+ FileChannel channel,
+ int position,
+ int batchSize) {
this.offset = offset;
+ this.magic = magic;
this.channel = channel;
this.position = position;
this.batchSize = batchSize;
}
@Override
- public long baseOffset() {
- if (magic() >= RecordBatch.MAGIC_VALUE_V2)
- return offset;
-
- loadUnderlyingRecordBatch();
- return underlying.baseOffset();
- }
-
- @Override
public CompressionType compressionType() {
- loadUnderlyingRecordBatch();
- return underlying.compressionType();
+ return loadBatchHeader().compressionType();
}
@Override
public TimestampType timestampType() {
- loadUnderlyingRecordBatch();
- return underlying.timestampType();
+ return loadBatchHeader().timestampType();
}
@Override
- public long maxTimestamp() {
- loadUnderlyingRecordBatch();
- return underlying.maxTimestamp();
+ public long checksum() {
+ return loadBatchHeader().checksum();
}
@Override
- public long lastOffset() {
- if (magic() < RecordBatch.MAGIC_VALUE_V2)
- return offset;
- else if (underlying != null)
- return underlying.lastOffset();
-
- try {
- // TODO: this logic probably should be moved into DefaultRecordBatch somehow
- // maybe we just need two separate implementations
-
- byte[] offsetDelta = new byte[4];
- ByteBuffer buf = ByteBuffer.wrap(offsetDelta);
- channel.read(buf, position + DefaultRecordBatch.LAST_OFFSET_DELTA_OFFSET);
- if (buf.hasRemaining())
- throw new KafkaException("Failed to read magic byte from FileChannel " + channel);
- return offset + buf.getInt(0);
- } catch (IOException e) {
- throw new KafkaException(e);
- }
+ public long maxTimestamp() {
+ return loadBatchHeader().maxTimestamp();
}
public int position() {
@@ -153,92 +141,27 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public byte magic() {
- if (magic != null)
- return magic;
- if (underlying != null)
- return underlying.magic();
-
- try {
- ByteBuffer buf = ByteBuffer.wrap(new byte[1]);
- Utils.readFullyOrFail(channel, buf, position + Records.MAGIC_OFFSET, "magic byte");
- magic = buf.get(0);
- return magic;
- } catch (IOException e) {
- throw new KafkaException(e);
- }
- }
-
- @Override
- public long producerId() {
- loadUnderlyingRecordBatch();
- return underlying.producerId();
- }
-
- @Override
- public short producerEpoch() {
- loadUnderlyingRecordBatch();
- return underlying.producerEpoch();
- }
-
- @Override
- public int baseSequence() {
- loadUnderlyingRecordBatch();
- return underlying.baseSequence();
- }
-
- @Override
- public int lastSequence() {
- loadUnderlyingRecordBatch();
- return underlying.lastSequence();
- }
-
- private void loadUnderlyingRecordBatch() {
- try {
- if (underlying != null)
- return;
-
- ByteBuffer batchBuffer = ByteBuffer.allocate(sizeInBytes());
- Utils.readFullyOrFail(channel, batchBuffer, position, "full record batch");
- batchBuffer.rewind();
-
- byte magic = batchBuffer.get(Records.MAGIC_OFFSET);
- if (magic > RecordBatch.MAGIC_VALUE_V1)
- underlying = new DefaultRecordBatch(batchBuffer);
- else
- underlying = new AbstractLegacyRecordBatch.ByteBufferLegacyRecordBatch(batchBuffer);
- } catch (IOException e) {
- throw new KafkaException("Failed to load record batch at position " + position + " from file channel " + channel);
- }
+ return magic;
}
@Override
public Iterator<Record> iterator() {
- loadUnderlyingRecordBatch();
- return underlying.iterator();
+ return loadFullBatch().iterator();
}
@Override
public CloseableIterator<Record> streamingIterator(BufferSupplier bufferSupplier) {
- loadUnderlyingRecordBatch();
- return underlying.streamingIterator(bufferSupplier);
+ return loadFullBatch().streamingIterator(bufferSupplier);
}
@Override
public boolean isValid() {
- loadUnderlyingRecordBatch();
- return underlying.isValid();
+ return loadFullBatch().isValid();
}
@Override
public void ensureValid() {
- loadUnderlyingRecordBatch();
- underlying.ensureValid();
- }
-
- @Override
- public long checksum() {
- loadUnderlyingRecordBatch();
- return underlying.checksum();
+ loadFullBatch().ensureValid();
}
@Override
@@ -247,12 +170,6 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
@Override
- public Integer countOrNull() {
- loadUnderlyingRecordBatch();
- return underlying.countOrNull();
- }
-
- @Override
public void writeTo(ByteBuffer buffer) {
try {
int limit = buffer.limit();
@@ -265,22 +182,37 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
}
}
- @Override
- public boolean isTransactional() {
- loadUnderlyingRecordBatch();
- return underlying.isTransactional();
+ protected abstract RecordBatch toMemoryRecordBatch(ByteBuffer buffer);
+
+ protected abstract int headerSize();
+
+ protected RecordBatch loadFullBatch() {
+ if (fullBatch == null) {
+ batchHeader = null;
+ fullBatch = loadBatchWithSize(sizeInBytes(), "full record batch");
+ }
+ return fullBatch;
}
- @Override
- public boolean isControlBatch() {
- loadUnderlyingRecordBatch();
- return underlying.isControlBatch();
+ protected RecordBatch loadBatchHeader() {
+ if (fullBatch != null)
+ return fullBatch;
+
+ if (batchHeader == null)
+ batchHeader = loadBatchWithSize(headerSize(), "record batch header");
+
+ return batchHeader;
}
- @Override
- public int partitionLeaderEpoch() {
- loadUnderlyingRecordBatch();
- return underlying.partitionLeaderEpoch();
+ private RecordBatch loadBatchWithSize(int size, String description) {
+ try {
+ ByteBuffer buffer = ByteBuffer.allocate(size);
+ Utils.readFullyOrFail(channel, buffer, position, description);
+ buffer.rewind();
+ return toMemoryRecordBatch(buffer);
+ } catch (IOException e) {
+ throw new KafkaException(e);
+ }
}
@Override
@@ -309,7 +241,9 @@ public class FileLogInputStream implements LogInputStream<FileLogInputStream.Fil
@Override
public String toString() {
- return "FileChannelRecordBatch(magic: " + magic() + ", offsets: [" + baseOffset() + ", " + lastOffset() + "])";
+ return "FileChannelRecordBatch(magic: " + magic +
+ ", offset: " + offset +
+ ", size: " + batchSize + ")";
}
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index a72ba8b..32ca1a7 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -202,7 +202,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
public void renameTo(File f) throws IOException {
try {
Utils.atomicMoveWithFallback(file.toPath(), f.toPath());
- } finally {
+ } finally {
this.file = f;
}
}
@@ -391,7 +391,7 @@ public class FileRecords extends AbstractRecords implements Closeable {
* @param mutable mutable
* @param fileAlreadyExists File already exists or not
* @param initFileSize The size used for pre allocate file, for example 512 * 1025 *1024
- * @param preallocate Pre allocate file or not, gotten from configuration.
+ * @param preallocate Pre-allocate file or not, gotten from configuration.
*/
private static FileChannel openChannel(File file,
boolean mutable,
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
index 25185b0..482c4a6 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LegacyRecord.java
@@ -45,10 +45,10 @@ public final class LegacyRecord {
public static final int MAGIC_OFFSET = CRC_OFFSET + CRC_LENGTH;
public static final int MAGIC_LENGTH = 1;
public static final int ATTRIBUTES_OFFSET = MAGIC_OFFSET + MAGIC_LENGTH;
- public static final int ATTRIBUTE_LENGTH = 1;
- public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ public static final int ATTRIBUTES_LENGTH = 1;
+ public static final int TIMESTAMP_OFFSET = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH;
public static final int TIMESTAMP_LENGTH = 8;
- public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTE_LENGTH;
+ public static final int KEY_SIZE_OFFSET_V0 = ATTRIBUTES_OFFSET + ATTRIBUTES_LENGTH;
public static final int KEY_SIZE_OFFSET_V1 = TIMESTAMP_OFFSET + TIMESTAMP_LENGTH;
public static final int KEY_SIZE_LENGTH = 4;
public static final int KEY_OFFSET_V0 = KEY_SIZE_OFFSET_V0 + KEY_SIZE_LENGTH;
@@ -58,17 +58,18 @@ public final class LegacyRecord {
/**
* The size for the record header
*/
- public static final int HEADER_SIZE = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTE_LENGTH;
+ public static final int HEADER_SIZE_V0 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH;
+ public static final int HEADER_SIZE_V1 = CRC_LENGTH + MAGIC_LENGTH + ATTRIBUTES_LENGTH + TIMESTAMP_LENGTH;
/**
* The amount of overhead bytes in a record
*/
- public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+ public static final int RECORD_OVERHEAD_V0 = HEADER_SIZE_V0 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
/**
* The amount of overhead bytes in a record
*/
- public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE + TIMESTAMP_LENGTH + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
+ public static final int RECORD_OVERHEAD_V1 = HEADER_SIZE_V1 + KEY_SIZE_LENGTH + VALUE_SIZE_LENGTH;
/**
* Specifies the mask for the compression code. 3 bits to hold the compression codec. 0 is reserved to indicate no
@@ -483,19 +484,11 @@ public final class LegacyRecord {
}
}
- public static int recordSize(byte[] key, byte[] value) {
- return recordSize(RecordBatch.CURRENT_MAGIC_VALUE, key, value);
- }
-
- public static int recordSize(byte magic, byte[] key, byte[] value) {
- return recordSize(magic, key == null ? 0 : key.length, value == null ? 0 : value.length);
- }
-
- public static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) {
+ static int recordSize(byte magic, ByteBuffer key, ByteBuffer value) {
return recordSize(magic, key == null ? 0 : key.limit(), value == null ? 0 : value.limit());
}
- private static int recordSize(byte magic, int keySize, int valueSize) {
+ public static int recordSize(byte magic, int keySize, int valueSize) {
return recordOverhead(magic) + keySize + valueSize;
}
@@ -547,16 +540,28 @@ public final class LegacyRecord {
return crc.getValue();
}
- public static int recordOverhead(byte magic) {
+ static int recordOverhead(byte magic) {
if (magic == 0)
return RECORD_OVERHEAD_V0;
- return RECORD_OVERHEAD_V1;
+ else if (magic == 1)
+ return RECORD_OVERHEAD_V1;
+ throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
+ }
+
+ static int headerSize(byte magic) {
+ if (magic == 0)
+ return HEADER_SIZE_V0;
+ else if (magic == 1)
+ return HEADER_SIZE_V1;
+ throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
}
private static int keyOffset(byte magic) {
if (magic == 0)
return KEY_OFFSET_V0;
- return KEY_OFFSET_V1;
+ else if (magic == 1)
+ return KEY_OFFSET_V1;
+ throw new IllegalArgumentException("Invalid magic used in LegacyRecord: " + magic);
}
public static TimestampType timestampType(byte magic, TimestampType wrapperRecordTimestampType, byte attributes) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 528095e..1d45635 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -506,11 +506,18 @@ public class MemoryRecords extends AbstractRecords {
producerId, producerEpoch, baseSequence, RecordBatch.NO_PARTITION_LEADER_EPOCH, true, records);
}
+ public static MemoryRecords withTransactionalRecords(byte magic, long initialOffset, CompressionType compressionType,
+ long producerId, short producerEpoch, int baseSequence,
+ int partitionLeaderEpoch, SimpleRecord... records) {
+ return withRecords(magic, initialOffset, compressionType, TimestampType.CREATE_TIME, producerId, producerEpoch,
+ baseSequence, partitionLeaderEpoch, true, records);
+ }
+
public static MemoryRecords withTransactionalRecords(long initialOffset, CompressionType compressionType, long producerId,
short producerEpoch, int baseSequence, int partitionLeaderEpoch,
SimpleRecord... records) {
- return withRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType, TimestampType.CREATE_TIME,
- producerId, producerEpoch, baseSequence, partitionLeaderEpoch, true, records);
+ return withTransactionalRecords(RecordBatch.CURRENT_MAGIC_VALUE, initialOffset, compressionType,
+ producerId, producerEpoch, baseSequence, partitionLeaderEpoch, records);
}
public static MemoryRecords withRecords(byte magic, long initialOffset, CompressionType compressionType,
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 6a4d1a1..a5a5036 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -49,6 +49,8 @@ public interface Records {
// the magic offset is at the same offset for all current message formats, but the 4 bytes
// between the size and the magic is dependent on the version.
int MAGIC_OFFSET = 16;
+ int MAGIC_LENGTH = 1;
+ int HEADER_SIZE_UP_TO_MAGIC = MAGIC_OFFSET + MAGIC_LENGTH;
/**
* The size of these records in bytes.
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 7d48623..97a6259 100644
--- a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -303,7 +303,7 @@ public class FetcherTest {
long offset = 0;
long timestamp = 500L;
- int size = LegacyRecord.recordSize(key, value);
+ int size = LegacyRecord.recordSize(magic, key.length, value.length);
byte attributes = LegacyRecord.computeAttributes(magic, CompressionType.NONE, TimestampType.CREATE_TIME);
long crc = LegacyRecord.computeChecksum(magic, attributes, timestamp, key, value);
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
index 7c37354..d5de4bd 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileLogInputStreamTest.java
@@ -16,33 +16,56 @@
*/
package org.apache.kafka.common.record;
+import org.apache.kafka.common.record.FileLogInputStream.FileChannelRecordBatch;
import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.test.TestUtils;
import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
import java.util.List;
+import static java.util.Arrays.asList;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V0;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V1;
+import static org.apache.kafka.common.record.RecordBatch.MAGIC_VALUE_V2;
+import static org.apache.kafka.common.record.RecordBatch.NO_TIMESTAMP;
+import static org.apache.kafka.common.record.TimestampType.CREATE_TIME;
+import static org.apache.kafka.common.record.TimestampType.NO_TIMESTAMP_TYPE;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+@RunWith(value = Parameterized.class)
public class FileLogInputStreamTest {
+ private final byte magic;
+ private final CompressionType compression;
+
+ public FileLogInputStreamTest(byte magic, CompressionType compression) {
+ this.magic = magic;
+ this.compression = compression;
+ }
+
@Test
public void testWriteTo() throws IOException {
try (FileRecords fileRecords = FileRecords.open(tempFile())) {
- fileRecords.append(MemoryRecords.withRecords(CompressionType.NONE, new SimpleRecord("foo".getBytes()),
- new SimpleRecord("bar".getBytes())));
+ fileRecords.append(MemoryRecords.withRecords(magic, compression, new SimpleRecord("foo".getBytes())));
fileRecords.flush();
FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
fileRecords.sizeInBytes());
- FileLogInputStream.FileChannelRecordBatch batch = logInputStream.nextBatch();
+ FileChannelRecordBatch batch = logInputStream.nextBatch();
assertNotNull(batch);
- assertEquals(RecordBatch.MAGIC_VALUE_V2, batch.magic());
+ assertEquals(magic, batch.magic());
ByteBuffer buffer = ByteBuffer.allocate(128);
batch.writeTo(buffer);
@@ -50,13 +73,195 @@ public class FileLogInputStreamTest {
MemoryRecords memRecords = MemoryRecords.readableRecords(buffer);
List<Record> records = Utils.toList(memRecords.records().iterator());
- assertEquals(2, records.size());
+ assertEquals(1, records.size());
Record record0 = records.get(0);
- assertTrue(record0.hasMagic(RecordBatch.MAGIC_VALUE_V2));
+ assertTrue(record0.hasMagic(magic));
assertEquals("foo", Utils.utf8(record0.value(), record0.valueSize()));
- Record record1 = records.get(1);
- assertTrue(record1.hasMagic(RecordBatch.MAGIC_VALUE_V2));
- assertEquals("bar", Utils.utf8(record1.value(), record1.valueSize()));
}
}
+
+ @Test
+ public void testSimpleBatchIteration() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ SimpleRecord firstBatchRecord = new SimpleRecord(3241324L, "a".getBytes(), "foo".getBytes());
+ SimpleRecord secondBatchRecord = new SimpleRecord(234280L, "b".getBytes(), "bar".getBytes());
+
+ fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+ fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+ fileRecords.flush();
+
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+ fileRecords.sizeInBytes());
+
+ FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+ assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecord);
+ assertNoProducerData(firstBatch);
+
+ FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+ assertGenericRecordBatchData(secondBatch, 1L, 234280L, secondBatchRecord);
+ assertNoProducerData(secondBatch);
+
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ @Test
+ public void testBatchIterationWithMultipleRecordsPerBatch() throws IOException {
+ if (magic < MAGIC_VALUE_V2 && compression == CompressionType.NONE)
+ return;
+
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
+
+ };
+ SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+ new SimpleRecord(897839L, null, "4".getBytes()),
+ new SimpleRecord(8234020L, "e".getBytes(), null)
+ };
+
+ fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecords));
+ fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecords));
+ fileRecords.flush();
+
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+ fileRecords.sizeInBytes());
+
+ FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+ assertNoProducerData(firstBatch);
+ assertGenericRecordBatchData(firstBatch, 0L, 3241324L, firstBatchRecords);
+
+ FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+ assertNoProducerData(secondBatch);
+ assertGenericRecordBatchData(secondBatch, 1L, 238423489L, secondBatchRecords);
+
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ @Test
+ public void testBatchIterationV2() throws IOException {
+ if (magic != MAGIC_VALUE_V2)
+ return;
+
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ long producerId = 83843L;
+ short producerEpoch = 15;
+ int baseSequence = 234;
+ int partitionLeaderEpoch = 9832;
+
+ SimpleRecord[] firstBatchRecords = new SimpleRecord[]{
+ new SimpleRecord(3241324L, "a".getBytes(), "1".getBytes()),
+ new SimpleRecord(234280L, "b".getBytes(), "2".getBytes())
+
+ };
+ SimpleRecord[] secondBatchRecords = new SimpleRecord[]{
+ new SimpleRecord(238423489L, "c".getBytes(), "3".getBytes()),
+ new SimpleRecord(897839L, null, "4".getBytes()),
+ new SimpleRecord(8234020L, "e".getBytes(), null)
+ };
+
+ fileRecords.append(MemoryRecords.withIdempotentRecords(magic, 15L, compression, producerId,
+ producerEpoch, baseSequence, partitionLeaderEpoch, firstBatchRecords));
+ fileRecords.append(MemoryRecords.withTransactionalRecords(magic, 27L, compression, producerId,
+ producerEpoch, baseSequence + firstBatchRecords.length, partitionLeaderEpoch, secondBatchRecords));
+ fileRecords.flush();
+
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+ fileRecords.sizeInBytes());
+
+ FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+ assertProducerData(firstBatch, producerId, producerEpoch, baseSequence, false, firstBatchRecords);
+ assertGenericRecordBatchData(firstBatch, 15L, 3241324L, firstBatchRecords);
+ assertEquals(partitionLeaderEpoch, firstBatch.partitionLeaderEpoch());
+
+ FileChannelRecordBatch secondBatch = logInputStream.nextBatch();
+ assertProducerData(secondBatch, producerId, producerEpoch, baseSequence + firstBatchRecords.length,
+ true, secondBatchRecords);
+ assertGenericRecordBatchData(secondBatch, 27L, 238423489L, secondBatchRecords);
+ assertEquals(partitionLeaderEpoch, secondBatch.partitionLeaderEpoch());
+
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ @Test
+ public void testBatchIterationIncompleteBatch() throws IOException {
+ try (FileRecords fileRecords = FileRecords.open(tempFile())) {
+ SimpleRecord firstBatchRecord = new SimpleRecord(100L, "foo".getBytes());
+ SimpleRecord secondBatchRecord = new SimpleRecord(200L, "bar".getBytes());
+
+ fileRecords.append(MemoryRecords.withRecords(magic, 0L, compression, CREATE_TIME, firstBatchRecord));
+ fileRecords.append(MemoryRecords.withRecords(magic, 1L, compression, CREATE_TIME, secondBatchRecord));
+ fileRecords.flush();
+ fileRecords.truncateTo(fileRecords.sizeInBytes() - 13);
+
+ FileLogInputStream logInputStream = new FileLogInputStream(fileRecords.channel(), 0,
+ fileRecords.sizeInBytes());
+
+ FileChannelRecordBatch firstBatch = logInputStream.nextBatch();
+ assertNoProducerData(firstBatch);
+ assertGenericRecordBatchData(firstBatch, 0L, 100L, firstBatchRecord);
+
+ assertNull(logInputStream.nextBatch());
+ }
+ }
+
+ private void assertProducerData(RecordBatch batch, long producerId, short producerEpoch, int baseSequence,
+ boolean isTransactional, SimpleRecord ... records) {
+ assertEquals(producerId, batch.producerId());
+ assertEquals(producerEpoch, batch.producerEpoch());
+ assertEquals(baseSequence, batch.baseSequence());
+ assertEquals(baseSequence + records.length - 1, batch.lastSequence());
+ assertEquals(isTransactional, batch.isTransactional());
+ }
+
+ private void assertNoProducerData(RecordBatch batch) {
+ assertEquals(RecordBatch.NO_PRODUCER_ID, batch.producerId());
+ assertEquals(RecordBatch.NO_PRODUCER_EPOCH, batch.producerEpoch());
+ assertEquals(RecordBatch.NO_SEQUENCE, batch.baseSequence());
+ assertEquals(RecordBatch.NO_SEQUENCE, batch.lastSequence());
+ assertFalse(batch.isTransactional());
+ }
+
+ private void assertGenericRecordBatchData(RecordBatch batch, long baseOffset, long maxTimestamp, SimpleRecord ... records) {
+ assertEquals(magic, batch.magic());
+ assertEquals(compression, batch.compressionType());
+
+ if (magic == MAGIC_VALUE_V0) {
+ assertEquals(NO_TIMESTAMP_TYPE, batch.timestampType());
+ } else {
+ assertEquals(CREATE_TIME, batch.timestampType());
+ assertEquals(maxTimestamp, batch.maxTimestamp());
+ }
+
+ assertEquals(baseOffset + records.length - 1, batch.lastOffset());
+ if (magic >= MAGIC_VALUE_V2)
+ assertEquals(Integer.valueOf(records.length), batch.countOrNull());
+
+ assertEquals(baseOffset, batch.baseOffset());
+ assertTrue(batch.isValid());
+
+ List<Record> batchRecords = TestUtils.toList(batch);
+ for (int i = 0; i < records.length; i++) {
+ assertEquals(baseOffset + i, batchRecords.get(i).offset());
+ assertEquals(records[i].key(), batchRecords.get(i).key());
+ assertEquals(records[i].value(), batchRecords.get(i).value());
+ if (magic == MAGIC_VALUE_V0)
+ assertEquals(NO_TIMESTAMP, batchRecords.get(i).timestamp());
+ else
+ assertEquals(records[i].timestamp(), batchRecords.get(i).timestamp());
+ }
+ }
+
+ @Parameterized.Parameters(name = "magic={0}, compression={1}")
+ public static Collection<Object[]> data() {
+ List<Object[]> values = new ArrayList<>();
+ for (byte magic : asList(MAGIC_VALUE_V0, MAGIC_VALUE_V1, MAGIC_VALUE_V2))
+ for (CompressionType type: CompressionType.values())
+ values.add(new Object[] {magic, type});
+ return values;
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 11ee419..8b9c900 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -26,11 +26,11 @@ import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.FileChannel;
-import java.util.Arrays;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import static java.util.Arrays.asList;
import static org.apache.kafka.test.TestUtils.tempFile;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
@@ -324,8 +324,8 @@ public class FileRecordsTest {
}
private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException {
- List<Long> offsets = Arrays.asList(0L, 2L, 3L, 9L, 11L, 15L);
- List<SimpleRecord> records = Arrays.asList(
+ List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L);
+ List<SimpleRecord> records = asList(
new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()),
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 0922c48..c621d53 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -554,7 +554,7 @@ public class MemoryRecordsBuilderTest {
}
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "bufferOffset={0}, compression={1}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (int bufferOffset : Arrays.asList(0, 15))
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
index e1d2882..c6fa1ce 100644
--- a/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
+++ b/core/src/main/scala/kafka/message/ByteBufferMessageSet.scala
@@ -166,9 +166,9 @@ class ByteBufferMessageSet(val buffer: ByteBuffer) extends MessageSet with Loggi
/** When flag isShallow is set to be true, we do a shallow iteration: just traverse the first level of messages. **/
private def internalIterator(isShallow: Boolean = false): Iterator[MessageAndOffset] = {
if (isShallow)
- asRecords.batches.asScala.iterator.map(batch => MessageAndOffset.fromRecordBatch(batch.asInstanceOf[AbstractLegacyRecordBatch]))
+ asRecords.batches.asScala.iterator.map(MessageAndOffset.fromRecordBatch)
else
- asRecords.records.asScala.iterator.map(record => MessageAndOffset.fromRecordBatch(record.asInstanceOf[AbstractLegacyRecordBatch]))
+ asRecords.records.asScala.iterator.map(MessageAndOffset.fromRecord)
}
/**
http://git-wip-us.apache.org/repos/asf/kafka/blob/81f0c1e8/core/src/main/scala/kafka/message/MessageAndOffset.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/MessageAndOffset.scala b/core/src/main/scala/kafka/message/MessageAndOffset.scala
index 349e90b..8de0f81 100644
--- a/core/src/main/scala/kafka/message/MessageAndOffset.scala
+++ b/core/src/main/scala/kafka/message/MessageAndOffset.scala
@@ -17,11 +17,29 @@
package kafka.message
-import org.apache.kafka.common.record.AbstractLegacyRecordBatch
+import org.apache.kafka.common.record.{AbstractLegacyRecordBatch, Record, RecordBatch}
object MessageAndOffset {
- def fromRecordBatch(recordBatch: AbstractLegacyRecordBatch): MessageAndOffset = {
- MessageAndOffset(Message.fromRecord(recordBatch.outerRecord), recordBatch.lastOffset)
+ def fromRecordBatch(batch: RecordBatch): MessageAndOffset = {
+ batch match {
+ case legacyBatch: AbstractLegacyRecordBatch =>
+ MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Illegal batch type ${batch.getClass}. The older message format classes " +
+ s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1")
+ }
+ }
+
+ def fromRecord(record: Record): MessageAndOffset = {
+ record match {
+ case legacyBatch: AbstractLegacyRecordBatch =>
+ MessageAndOffset(Message.fromRecord(legacyBatch.outerRecord), legacyBatch.lastOffset)
+
+ case _ =>
+ throw new IllegalArgumentException(s"Illegal record type ${record.getClass}. The older message format classes " +
+ s"only support conversion from ${classOf[AbstractLegacyRecordBatch]}, which is used for magic v0 and v1")
+ }
}
}