You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2017/06/01 19:05:39 UTC
kafka git commit: KAFKA-5150; reduce lz4 decompression overhead
Repository: kafka
Updated Branches:
refs/heads/0.10.2 386a8d041 -> e944956a3
KAFKA-5150; reduce lz4 decompression overhead
- reuse decompression buffers in consumer Fetcher
- switch lz4 input stream to operate directly on ByteBuffers
- avoids performance impact of catching exceptions when reaching the end of legacy record batches
- more tests with both compressible / incompressible data, multiple
blocks, and various other combinations to increase code coverage
- fixes bug that would cause exception instead of invalid block size
for invalid incompressible blocks
- fixes bug if incompressible flag is set on end frame block size
Author: Xavier Léauté <xa...@confluent.io>
Author: Ismael Juma <is...@juma.me.uk>
Reviewers: Jason Gustafson <ja...@confluent.io>, Ismael Juma <is...@juma.me.uk>
Closes #3090 from xvrl/kafka-5150-0.10
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/e944956a
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/e944956a
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/e944956a
Branch: refs/heads/0.10.2
Commit: e944956a35d30ba4e393be887f873f12106efe75
Parents: 386a8d0
Author: Xavier Léauté <xa...@confluent.io>
Authored: Thu Jun 1 20:05:04 2017 +0100
Committer: Ismael Juma <is...@juma.me.uk>
Committed: Thu Jun 1 20:05:11 2017 +0100
----------------------------------------------------------------------
.../clients/consumer/internals/Fetcher.java | 4 +-
.../kafka/common/record/AbstractRecords.java | 4 +-
.../kafka/common/record/BufferSupplier.java | 95 ++++++
.../apache/kafka/common/record/FileRecords.java | 23 +-
.../common/record/KafkaLZ4BlockInputStream.java | 205 +++++++------
.../apache/kafka/common/record/LogEntry.java | 2 +-
.../kafka/common/record/MemoryRecords.java | 19 +-
.../common/record/MemoryRecordsBuilder.java | 22 +-
.../org/apache/kafka/common/record/Records.java | 18 +-
.../kafka/common/record/RecordsIterator.java | 51 ++--
.../org/apache/kafka/common/utils/Utils.java | 24 ++
.../internals/RecordAccumulatorTest.java | 7 +-
.../kafka/common/record/FileRecordsTest.java | 2 +-
.../kafka/common/record/KafkaLZ4Test.java | 286 ++++++++++++++++---
.../kafka/common/record/MemoryRecordsTest.java | 6 +-
.../src/main/scala/kafka/log/LogValidator.scala | 2 +-
.../kafka/message/CompressionFactory.scala | 19 +-
.../kafka/message/MessageCompressionTest.scala | 7 +-
.../unit/kafka/message/MessageWriterTest.scala | 2 +-
19 files changed, 593 insertions(+), 205 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index e2631b5..a3d4a90 100644
--- a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -41,6 +41,7 @@ import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.common.metrics.stats.Rate;
import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.protocol.Errors;
+import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.InvalidRecordException;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
@@ -93,6 +94,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
private final ConcurrentLinkedQueue<CompletedFetch> completedFetches;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
+ private final BufferSupplier decompressionBufferSupplier = BufferSupplier.create();
private PartitionRecords<K, V> nextInLineRecords = null;
private ExceptionMetadata nextInLineExceptionMetadata = null;
@@ -782,7 +784,7 @@ public class Fetcher<K, V> implements SubscriptionState.Listener {
List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
boolean skippedRecords = false;
- for (LogEntry logEntry : partition.records.deepEntries()) {
+ for (LogEntry logEntry : partition.records.deepEntries(decompressionBufferSupplier)) {
// Skip the messages earlier than current position.
if (logEntry.offset() >= position) {
parsed.add(parseRecord(tp, logEntry));
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 3a96d88..ebfc7ef 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -26,7 +26,7 @@ public abstract class AbstractRecords implements Records {
@Override
public Iterator<Record> iterator() {
return new Iterator<Record>() {
- private final Iterator<? extends LogEntry> deepEntries = deepEntries().iterator();
+ private final Iterator<? extends LogEntry> deepEntries = deepEntries(BufferSupplier.NO_CACHING).iterator();
@Override
public boolean hasNext() {
return deepEntries.hasNext();
@@ -57,7 +57,7 @@ public abstract class AbstractRecords implements Records {
@Override
public Records toMessageFormat(byte toMagic) {
List<LogEntry> converted = new ArrayList<>();
- for (LogEntry entry : deepEntries())
+ for (LogEntry entry : deepEntries(BufferSupplier.NO_CACHING))
converted.add(LogEntry.create(entry.offset(), entry.record().convert(toMagic)));
if (converted.isEmpty()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
new file mode 100644
index 0000000..df7ba0b
--- /dev/null
+++ b/clients/src/main/java/org/apache/kafka/common/record/BufferSupplier.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.record;
+
+import java.nio.ByteBuffer;
+import java.util.ArrayDeque;
+import java.util.Deque;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Simple non-threadsafe interface for caching byte buffers. This is suitable for simple cases like ensuring that
+ * a given KafkaConsumer reuses the same decompression buffer when iterating over fetched records. For small record
+ * batches, allocating a potentially large buffer (64 KB for LZ4) will dominate the cost of decompressing and
+ * iterating over the records in the batch.
+ */
+public abstract class BufferSupplier implements AutoCloseable {
+
+ public static final BufferSupplier NO_CACHING = new BufferSupplier() {
+ @Override
+ public ByteBuffer get(int capacity) {
+ return ByteBuffer.allocate(capacity);
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {}
+
+ @Override
+ public void close() {}
+ };
+
+ public static BufferSupplier create() {
+ return new DefaultSupplier();
+ }
+
+ /**
+ * Supply a buffer with the required capacity. This may return a cached buffer or allocate a new instance.
+ */
+ public abstract ByteBuffer get(int capacity);
+
+ /**
+ * Return the provided buffer to be reused by a subsequent call to `get`.
+ */
+ public abstract void release(ByteBuffer buffer);
+
+ /**
+ * Release all resources associated with this supplier.
+ */
+ public abstract void close();
+
+ private static class DefaultSupplier extends BufferSupplier {
+ // We currently use a single block size, so optimise for that case
+ private final Map<Integer, Deque<ByteBuffer>> bufferMap = new HashMap<>(1);
+
+ @Override
+ public ByteBuffer get(int size) {
+ Deque<ByteBuffer> bufferQueue = bufferMap.get(size);
+ if (bufferQueue == null || bufferQueue.isEmpty())
+ return ByteBuffer.allocate(size);
+ else
+ return bufferQueue.pollFirst();
+ }
+
+ @Override
+ public void release(ByteBuffer buffer) {
+ buffer.clear();
+ Deque<ByteBuffer> bufferQueue = bufferMap.get(buffer.capacity());
+ if (bufferQueue == null) {
+ // We currently keep a single buffer in flight, so optimise for that case
+ bufferQueue = new ArrayDeque<>(1);
+ bufferMap.put(buffer.capacity(), bufferQueue);
+ }
+ bufferQueue.addLast(buffer);
+ }
+
+ @Override
+ public void close() {
+ bufferMap.clear();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 960b716..fd8df3e 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -43,13 +43,6 @@ public class FileRecords extends AbstractRecords implements Closeable {
private final Iterable<FileChannelLogEntry> shallowEntries;
- private final Iterable<LogEntry> deepEntries = new Iterable<LogEntry>() {
- @Override
- public Iterator<LogEntry> iterator() {
- return deepIterator();
- }
- };
-
// mutable state
private final AtomicInteger size;
private final FileChannel channel;
@@ -362,18 +355,28 @@ public class FileRecords extends AbstractRecords implements Closeable {
}
@Override
+ public Iterable<LogEntry> deepEntries(final BufferSupplier bufferSupplier) {
+ return new Iterable<LogEntry>() {
+ @Override
+ public Iterator<LogEntry> iterator() {
+ return deepIterator(bufferSupplier);
+ }
+ };
+ }
+
+ @Override
public Iterable<LogEntry> deepEntries() {
- return deepEntries;
+ return deepEntries(BufferSupplier.NO_CACHING);
}
- private Iterator<LogEntry> deepIterator() {
+ private Iterator<LogEntry> deepIterator(BufferSupplier bufferSupplier) {
final int end;
if (isSlice)
end = this.end;
else
end = this.sizeInBytes();
FileLogInputStream inputStream = new FileLogInputStream(channel, Integer.MAX_VALUE, start, end);
- return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE);
+ return new RecordsIterator(inputStream, false, false, Integer.MAX_VALUE, bufferSupplier);
}
public static FileRecords open(File file,
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index a408580..3d9d86b 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -14,83 +14,78 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.kafka.common.record;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_MAX_HEADER_LENGTH;
-import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
-
-import java.io.FilterInputStream;
-import java.io.IOException;
-import java.io.InputStream;
-
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
-import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
-import org.apache.kafka.common.utils.Utils;
-
import net.jpountz.lz4.LZ4Exception;
import net.jpountz.lz4.LZ4Factory;
import net.jpountz.lz4.LZ4SafeDecompressor;
import net.jpountz.xxhash.XXHash32;
import net.jpountz.xxhash.XXHashFactory;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.BD;
+import org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.FLG;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.MAGIC;
+
/**
* A partial implementation of the v1.5.1 LZ4 Frame format.
*
- * @see <a href="http://cyan4973.github.io/lz4/lz4_Frame_format.html">LZ4 Frame Format</a>
+ * @see <a href="https://github.com/lz4/lz4/wiki/lz4_Frame_format.md">LZ4 Frame Format</a>
+ *
+ * This class is not thread-safe.
*/
-public final class KafkaLZ4BlockInputStream extends FilterInputStream {
+public final class KafkaLZ4BlockInputStream extends InputStream {
public static final String PREMATURE_EOS = "Stream ended prematurely";
public static final String NOT_SUPPORTED = "Stream unsupported (invalid magic bytes)";
public static final String BLOCK_HASH_MISMATCH = "Block checksum mismatch";
public static final String DESCRIPTOR_HASH_MISMATCH = "Stream frame descriptor corrupted";
- private final LZ4SafeDecompressor decompressor;
- private final XXHash32 checksum;
- private final byte[] buffer;
- private final byte[] compressedBuffer;
- private final int maxBlockSize;
+ private static final LZ4SafeDecompressor DECOMPRESSOR = LZ4Factory.fastestInstance().safeDecompressor();
+ private static final XXHash32 CHECKSUM = XXHashFactory.fastestInstance().hash32();
+
+ private final ByteBuffer in;
private final boolean ignoreFlagDescriptorChecksum;
+ private final BufferSupplier bufferSupplier;
+ private final ByteBuffer decompressionBuffer;
+ // `flg` and `maxBlockSize` are effectively final, they are initialised in the `readHeader` method that is only
+ // invoked from the constructor
private FLG flg;
- private BD bd;
- private int bufferOffset;
- private int bufferSize;
+ private int maxBlockSize;
+
+ // If a block is compressed, this is the same as `decompressionBuffer`. If a block is not compressed, this is
+ // a slice of `in` to avoid unnecessary copies.
+ private ByteBuffer decompressedBuffer;
private boolean finished;
/**
* Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
*
- * @param in The stream to decompress
+ * @param in The byte buffer to decompress
* @param ignoreFlagDescriptorChecksum for compatibility with old kafka clients, ignore incorrect HC byte
* @throws IOException
*/
- public KafkaLZ4BlockInputStream(InputStream in, boolean ignoreFlagDescriptorChecksum) throws IOException {
- super(in);
- decompressor = LZ4Factory.fastestInstance().safeDecompressor();
- checksum = XXHashFactory.fastestInstance().hash32();
+ public KafkaLZ4BlockInputStream(ByteBuffer in, BufferSupplier bufferSupplier, boolean ignoreFlagDescriptorChecksum) throws IOException {
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
+ this.in = in.duplicate().order(ByteOrder.LITTLE_ENDIAN);
+ this.bufferSupplier = bufferSupplier;
readHeader();
- maxBlockSize = bd.getBlockMaximumSize();
- buffer = new byte[maxBlockSize];
- compressedBuffer = new byte[maxBlockSize];
- bufferOffset = 0;
- bufferSize = 0;
+ decompressionBuffer = bufferSupplier.get(maxBlockSize);
+ if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+ // require array backed decompression buffer with zero offset
+ // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+ throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+ }
finished = false;
}
/**
- * Create a new {@link InputStream} that will decompress data using the LZ4 algorithm.
- *
- * @param in The stream to decompress
- * @throws IOException
- */
- public KafkaLZ4BlockInputStream(InputStream in) throws IOException {
- this(in, false);
- }
-
- /**
* Check whether KafkaLZ4BlockInputStream is configured to ignore the
* Frame Descriptor checksum, which is useful for compatibility with
* old client implementations that use incorrect checksum calculations.
@@ -100,43 +95,50 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
}
/**
- * Reads the magic number and frame descriptor from the underlying {@link InputStream}.
+ * Reads the magic number and frame descriptor from input buffer.
*
* @throws IOException
*/
private void readHeader() throws IOException {
- byte[] header = new byte[LZ4_MAX_HEADER_LENGTH];
-
// read first 6 bytes into buffer to check magic and FLG/BD descriptor flags
- int headerOffset = 6;
- if (in.read(header, 0, headerOffset) != headerOffset) {
+ if (in.remaining() < 6) {
throw new IOException(PREMATURE_EOS);
}
- if (MAGIC != Utils.readUnsignedIntLE(header, headerOffset - 6)) {
+ if (MAGIC != in.getInt()) {
throw new IOException(NOT_SUPPORTED);
}
- flg = FLG.fromByte(header[headerOffset - 2]);
- bd = BD.fromByte(header[headerOffset - 1]);
+ // mark start of data to checksum
+ in.mark();
+
+ flg = FLG.fromByte(in.get());
+ maxBlockSize = BD.fromByte(in.get()).getBlockMaximumSize();
if (flg.isContentSizeSet()) {
- if (in.read(header, headerOffset, 8) != 8)
+ if (in.remaining() < 8) {
throw new IOException(PREMATURE_EOS);
- headerOffset += 8;
+ }
+ in.position(in.position() + 8);
}
// Final byte of Frame Descriptor is HC checksum
- header[headerOffset++] = (byte) in.read();
// Old implementations produced incorrect HC checksums
- if (ignoreFlagDescriptorChecksum)
+ if (ignoreFlagDescriptorChecksum) {
+ in.position(in.position() + 1);
return;
+ }
+
+ int len = in.position() - in.reset().position();
- int offset = 4;
- int len = headerOffset - offset - 1; // dont include magic bytes or HC
- byte hash = (byte) ((checksum.hash(header, offset, len, 0) >> 8) & 0xFF);
- if (hash != header[headerOffset - 1])
+ int hash = in.hasArray() ?
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+ CHECKSUM.hash(in, in.position(), len, 0);
+ in.position(in.position() + len);
+ if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
+ }
}
/**
@@ -146,46 +148,70 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
* @throws IOException
*/
private void readBlock() throws IOException {
- int blockSize = Utils.readUnsignedIntLE(in);
+ if (in.remaining() < 4) {
+ throw new IOException(PREMATURE_EOS);
+ }
+
+ int blockSize = in.getInt();
+ boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
+ blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
// Check for EndMark
if (blockSize == 0) {
finished = true;
if (flg.isContentChecksumSet())
- Utils.readUnsignedIntLE(in); // TODO: verify this content checksum
+ in.getInt(); // TODO: verify this content checksum
return;
} else if (blockSize > maxBlockSize) {
throw new IOException(String.format("Block size %s exceeded max: %s", blockSize, maxBlockSize));
}
- boolean compressed = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) == 0;
- byte[] bufferToRead;
- if (compressed) {
- bufferToRead = compressedBuffer;
- } else {
- blockSize &= ~LZ4_FRAME_INCOMPRESSIBLE_MASK;
- bufferToRead = buffer;
- bufferSize = blockSize;
- }
-
- if (in.read(bufferToRead, 0, blockSize) != blockSize) {
+ if (in.remaining() < blockSize) {
throw new IOException(PREMATURE_EOS);
}
- // verify checksum
- if (flg.isBlockChecksumSet() && Utils.readUnsignedIntLE(in) != checksum.hash(bufferToRead, 0, blockSize, 0)) {
- throw new IOException(BLOCK_HASH_MISMATCH);
- }
-
if (compressed) {
try {
- bufferSize = decompressor.decompress(compressedBuffer, 0, blockSize, buffer, 0, maxBlockSize);
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ final int bufferSize;
+ if (in.hasArray()) {
+ bufferSize = DECOMPRESSOR.decompress(
+ in.array(),
+ in.position() + in.arrayOffset(),
+ blockSize,
+ decompressionBuffer.array(),
+ 0,
+ maxBlockSize
+ );
+ } else {
+ // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+ // https://github.com/lz4/lz4-java/pull/65
+ bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+ }
+ decompressionBuffer.position(0);
+ decompressionBuffer.limit(bufferSize);
+ decompressedBuffer = decompressionBuffer;
} catch (LZ4Exception e) {
throw new IOException(e);
}
+ } else {
+ decompressedBuffer = in.slice();
+ decompressedBuffer.limit(blockSize);
}
- bufferOffset = 0;
+ // verify checksum
+ if (flg.isBlockChecksumSet()) {
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ int hash = in.hasArray() ?
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+ CHECKSUM.hash(in, in.position(), blockSize, 0);
+ in.position(in.position() + blockSize);
+ if (hash != in.getInt()) {
+ throw new IOException(BLOCK_HASH_MISMATCH);
+ }
+ } else {
+ in.position(in.position() + blockSize);
+ }
}
@Override
@@ -200,7 +226,7 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
return -1;
}
- return buffer[bufferOffset++] & 0xFF;
+ return decompressedBuffer.get() & 0xFF;
}
@Override
@@ -216,8 +242,8 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
return -1;
}
len = Math.min(len, available());
- System.arraycopy(buffer, bufferOffset, b, off, len);
- bufferOffset += len;
+
+ decompressedBuffer.get(b, off, len);
return len;
}
@@ -232,28 +258,28 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
if (finished) {
return 0;
}
- n = Math.min(n, available());
- bufferOffset += n;
- return n;
+ int skipped = (int) Math.min(n, available());
+ decompressedBuffer.position(decompressedBuffer.position() + skipped);
+ return skipped;
}
@Override
public int available() throws IOException {
- return bufferSize - bufferOffset;
+ return decompressedBuffer == null ? 0 : decompressedBuffer.remaining();
}
@Override
public void close() throws IOException {
- in.close();
+ bufferSupplier.release(decompressionBuffer);
}
@Override
- public synchronized void mark(int readlimit) {
+ public void mark(int readlimit) {
throw new RuntimeException("mark not supported");
}
@Override
- public synchronized void reset() throws IOException {
+ public void reset() throws IOException {
throw new RuntimeException("reset not supported");
}
@@ -261,5 +287,4 @@ public final class KafkaLZ4BlockInputStream extends FilterInputStream {
public boolean markSupported() {
return false;
}
-
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
index d2db356..51c9ebb 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/LogEntry.java
@@ -107,7 +107,7 @@ public abstract class LogEntry implements Iterable<LogEntry> {
@Override
public Iterator<LogEntry> iterator() {
if (isCompressed())
- return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE);
+ return new RecordsIterator.DeepRecordsIterator(this, false, Integer.MAX_VALUE, BufferSupplier.NO_CACHING);
return Collections.singletonList(this).iterator();
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index f1a6e43..6cc70be 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -43,8 +43,6 @@ public class MemoryRecords extends AbstractRecords {
}
};
- private final Iterable<LogEntry> deepEntries = deepEntries(false);
-
private int validBytes = -1;
// Construct a writable memory records
@@ -231,27 +229,32 @@ public class MemoryRecords extends AbstractRecords {
}
@Override
+ public Iterable<LogEntry> deepEntries(BufferSupplier bufferSupplier) {
+ return deepEntries(false, bufferSupplier);
+ }
+
+ @Override
public Iterable<LogEntry> deepEntries() {
- return deepEntries;
+ return deepEntries(false, BufferSupplier.NO_CACHING);
}
- public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic) {
+ public Iterable<LogEntry> deepEntries(final boolean ensureMatchingMagic, final BufferSupplier bufferSupplier) {
return new Iterable<LogEntry>() {
@Override
public Iterator<LogEntry> iterator() {
- return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE);
+ return deepIterator(ensureMatchingMagic, Integer.MAX_VALUE, bufferSupplier);
}
};
}
- private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize) {
+ private Iterator<LogEntry> deepIterator(boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) {
return new RecordsIterator(new ByteBufferLogInputStream(buffer.duplicate(), maxMessageSize), false,
- ensureMatchingMagic, maxMessageSize);
+ ensureMatchingMagic, maxMessageSize, bufferSupplier);
}
@Override
public String toString() {
- Iterator<LogEntry> iter = deepEntries().iterator();
+ Iterator<LogEntry> iter = deepEntries(BufferSupplier.NO_CACHING).iterator();
StringBuilder builder = new StringBuilder();
builder.append('[');
while (iter.hasNext()) {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
index a46c1c6..7775140 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java
@@ -18,7 +18,6 @@ package org.apache.kafka.common.record;
import org.apache.kafka.common.KafkaException;
-import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.InputStream;
@@ -81,7 +80,7 @@ public class MemoryRecordsBuilder {
@Override
public Constructor get() throws ClassNotFoundException, NoSuchMethodException {
return Class.forName("org.apache.kafka.common.record.KafkaLZ4BlockInputStream")
- .getConstructor(InputStream.class, Boolean.TYPE);
+ .getConstructor(ByteBuffer.class, BufferSupplier.class, Boolean.TYPE);
}
});
@@ -408,7 +407,7 @@ public class MemoryRecordsBuilder {
return builtRecords != null ? builtRecords.sizeInBytes() : estimatedBytesWritten();
}
- private static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
+ protected static DataOutputStream wrapForOutput(ByteBufferOutputStream buffer, CompressionType type, byte messageVersion, int bufferSize) {
try {
switch (type) {
case NONE:
@@ -438,25 +437,26 @@ public class MemoryRecordsBuilder {
}
}
- public static DataInputStream wrapForInput(ByteBufferInputStream buffer, CompressionType type, byte messageVersion) {
+ public static InputStream wrapForInput(ByteBuffer buffer, CompressionType type, byte messageVersion, BufferSupplier bufferSupplier) {
try {
switch (type) {
case NONE:
- return buffer;
+ return new ByteBufferInputStream(buffer);
case GZIP:
- return new DataInputStream(new GZIPInputStream(buffer));
+ return new GZIPInputStream(new ByteBufferInputStream(buffer));
case SNAPPY:
try {
- InputStream stream = (InputStream) snappyInputStreamSupplier.get().newInstance(buffer);
- return new DataInputStream(stream);
+ return (InputStream) snappyInputStreamSupplier.get().newInstance(new ByteBufferInputStream(buffer));
} catch (Exception e) {
throw new KafkaException(e);
}
case LZ4:
try {
- InputStream stream = (InputStream) lz4InputStreamSupplier.get().newInstance(buffer,
- messageVersion == Record.MAGIC_VALUE_V0);
- return new DataInputStream(stream);
+ return (InputStream) lz4InputStreamSupplier.get().newInstance(
+ buffer,
+ bufferSupplier,
+ messageVersion == Record.MAGIC_VALUE_V0
+ );
} catch (Exception e) {
throw new KafkaException(e);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index 9235f92..2a3e506 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -24,7 +24,7 @@ import java.nio.channels.GatheringByteChannel;
* Each log entry consists of an 8 byte offset, a 4 byte record size, and a "shallow" {@link Record record}.
* If the entry is not compressed, then each entry will have only the shallow record contained inside it. If it is
* compressed, the entry contains "deep" records, which are packed into the value field of the shallow record. To iterate
- * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries()}. Note
+ * over the shallow records, use {@link #shallowEntries()}; for the deep records, use {@link #deepEntries(BufferSupplier)}. Note
* that the deep iterator handles both compressed and non-compressed entries: if the entry is not compressed, the
* shallow record is returned; otherwise, the shallow record is decompressed and the deep entries are returned.
* See {@link MemoryRecords} for the in-memory representation and {@link FileRecords} for the on-disk representation.
@@ -67,6 +67,22 @@ public interface Records {
* there are fewer options for optimization since the data must be decompressed before it can be
* returned. Hence there is little advantage in allowing subclasses to return a more specific type
* as we do for {@link #shallowEntries()}.
+ *
+ * @param decompressionBufferSupplier The supplier of ByteBuffer(s) used for decompression if supported.
+ * For small record batches, allocating a potentially large buffer (64 KB for LZ4)
+ * will dominate the cost of decompressing and iterating over the records in the
+ * batch. As such, a supplier that reuses buffers will have a significant
+ * performance impact.
+ * @return An iterator over the deep entries of the log
+ */
+ Iterable<LogEntry> deepEntries(BufferSupplier decompressionBufferSupplier);
+
+ /**
+ * Get the deep log entries (i.e. descend into compressed message sets). For the deep records,
+ * there are fewer options for optimization since the data must be decompressed before it can be
+ * returned. Hence there is little advantage in allowing subclasses to return a more specific type
+ * as we do for {@link #shallowEntries()}.
+ *
* @return An iterator over the deep entries of the log
*/
Iterable<LogEntry> deepEntries();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
index 792a857..3150be3 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/RecordsIterator.java
@@ -21,9 +21,8 @@ import org.apache.kafka.common.errors.CorruptRecordException;
import org.apache.kafka.common.utils.AbstractIterator;
import org.apache.kafka.common.utils.Utils;
-import java.io.DataInputStream;
-import java.io.EOFException;
import java.io.IOException;
+import java.io.InputStream;
import java.nio.ByteBuffer;
import java.util.ArrayDeque;
import java.util.Iterator;
@@ -36,16 +35,19 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
private final boolean ensureMatchingMagic;
private final int maxRecordSize;
private final ShallowRecordsIterator<?> shallowIter;
+ private final BufferSupplier bufferSupplier;
private DeepRecordsIterator innerIter;
public RecordsIterator(LogInputStream<?> logInputStream,
boolean shallow,
boolean ensureMatchingMagic,
- int maxRecordSize) {
+ int maxRecordSize,
+ BufferSupplier bufferSupplier) {
this.shallowIter = new ShallowRecordsIterator<>(logInputStream);
this.shallow = shallow;
this.ensureMatchingMagic = ensureMatchingMagic;
this.maxRecordSize = maxRecordSize;
+ this.bufferSupplier = bufferSupplier;
}
/**
@@ -76,7 +78,7 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
// would not try to further decompress underlying messages
// There will be at least one element in the inner iterator, so we don't
// need to call hasNext() here.
- innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize);
+ innerIter = new DeepRecordsIterator(entry, ensureMatchingMagic, maxRecordSize, bufferSupplier);
return innerIter.next();
}
} else {
@@ -89,30 +91,35 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
}
private static final class DataLogInputStream implements LogInputStream<LogEntry> {
- private final DataInputStream stream;
+
+ private final InputStream stream;
protected final int maxMessageSize;
+ private final ByteBuffer offsetAndSizeBuffer;
- DataLogInputStream(DataInputStream stream, int maxMessageSize) {
+ DataLogInputStream(InputStream stream, int maxMessageSize) {
this.stream = stream;
this.maxMessageSize = maxMessageSize;
+ this.offsetAndSizeBuffer = ByteBuffer.allocate(Records.LOG_OVERHEAD);
}
public LogEntry nextEntry() throws IOException {
- try {
- long offset = stream.readLong();
- int size = stream.readInt();
- if (size < Record.RECORD_OVERHEAD_V0)
- throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
- if (size > maxMessageSize)
- throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
-
- byte[] recordBuffer = new byte[size];
- stream.readFully(recordBuffer, 0, size);
- ByteBuffer buf = ByteBuffer.wrap(recordBuffer);
- return LogEntry.create(offset, new Record(buf));
- } catch (EOFException e) {
+ offsetAndSizeBuffer.clear();
+ Utils.readFully(stream, offsetAndSizeBuffer);
+ if (offsetAndSizeBuffer.hasRemaining())
return null;
- }
+ long offset = offsetAndSizeBuffer.getLong(Records.OFFSET_OFFSET);
+ int size = offsetAndSizeBuffer.getInt(Records.SIZE_OFFSET);
+ if (size < Record.RECORD_OVERHEAD_V0)
+ throw new CorruptRecordException(String.format("Record size is less than the minimum record overhead (%d)", Record.RECORD_OVERHEAD_V0));
+ if (size > maxMessageSize)
+ throw new CorruptRecordException(String.format("Record size exceeds the largest allowable message size (%d).", maxMessageSize));
+
+ ByteBuffer batchBuffer = ByteBuffer.allocate(size);
+ Utils.readFully(stream, batchBuffer);
+ if (batchBuffer.hasRemaining())
+ return null;
+ batchBuffer.flip();
+ return LogEntry.create(offset, new Record(batchBuffer));
}
}
@@ -141,13 +148,13 @@ public class RecordsIterator extends AbstractIterator<LogEntry> {
private final long absoluteBaseOffset;
private final byte wrapperMagic;
- public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize) {
+ public DeepRecordsIterator(LogEntry wrapperEntry, boolean ensureMatchingMagic, int maxMessageSize, BufferSupplier bufferSupplier) {
Record wrapperRecord = wrapperEntry.record();
this.wrapperMagic = wrapperRecord.magic();
CompressionType compressionType = wrapperRecord.compressionType();
ByteBuffer buffer = wrapperRecord.value();
- DataInputStream stream = MemoryRecordsBuilder.wrapForInput(new ByteBufferInputStream(buffer), compressionType, wrapperRecord.magic());
+ InputStream stream = MemoryRecordsBuilder.wrapForInput(buffer, compressionType, wrapperRecord.magic(), bufferSupplier);
LogInputStream logStream = new DataLogInputStream(stream, maxMessageSize);
long wrapperRecordOffset = wrapperEntry.offset();
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
index afa85bd..1751e44 100755
--- a/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
+++ b/clients/src/main/java/org/apache/kafka/common/utils/Utils.java
@@ -855,4 +855,28 @@ public class Utils {
currentPosition += bytesRead;
} while (bytesRead != -1 && destinationBuffer.hasRemaining());
}
+
+ /**
+ * Read data from the input stream to the given byte buffer until there are no bytes remaining in the buffer or the
+ * end of the stream has been reached.
+ *
+ * @param inputStream Input stream to read from
+ * @param destinationBuffer The buffer into which bytes are to be transferred (it must be backed by an array)
+ * @throws IOException If an I/O error occurs
+ */
+ public static final void readFully(InputStream inputStream, ByteBuffer destinationBuffer) throws IOException {
+ if (!destinationBuffer.hasArray())
+ throw new IllegalArgumentException("destinationBuffer must be backed by an array");
+ int initialOffset = destinationBuffer.arrayOffset() + destinationBuffer.position();
+ byte[] array = destinationBuffer.array();
+ int length = destinationBuffer.remaining();
+ int totalBytesRead = 0;
+ do {
+ int bytesRead = inputStream.read(array, initialOffset + totalBytesRead, length - totalBytesRead);
+ if (bytesRead == -1)
+ break;
+ totalBytesRead += bytesRead;
+ } while (length > totalBytesRead);
+ destinationBuffer.position(destinationBuffer.position() + totalBytesRead);
+ }
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
index f8bb1e9..5f26c13 100644
--- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
+++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/RecordAccumulatorTest.java
@@ -20,6 +20,7 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.errors.TimeoutException;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.record.BufferSupplier;
import org.apache.kafka.common.record.CompressionType;
import org.apache.kafka.common.record.LogEntry;
import org.apache.kafka.common.record.Record;
@@ -104,7 +105,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
+ Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator();
for (int i = 0; i < appends; i++) {
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
@@ -133,7 +134,7 @@ public class RecordAccumulatorTest {
assertEquals(1, batches.size());
RecordBatch batch = batches.get(0);
- Iterator<LogEntry> iter = batch.records().deepEntries().iterator();
+ Iterator<LogEntry> iter = batch.records().deepEntries(BufferSupplier.NO_CACHING).iterator();
LogEntry entry = iter.next();
assertEquals("Keys should match", ByteBuffer.wrap(key), entry.record().key());
assertEquals("Values should match", ByteBuffer.wrap(value), entry.record().value());
@@ -185,7 +186,7 @@ public class RecordAccumulatorTest {
List<RecordBatch> batches = accum.drain(cluster, nodes, 5 * 1024, 0).get(node1.id());
if (batches != null) {
for (RecordBatch batch : batches) {
- for (LogEntry entry : batch.records().deepEntries())
+ for (LogEntry entry : batch.records().deepEntries(BufferSupplier.NO_CACHING))
read++;
accum.deallocate(batch);
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 274bf9d..dcbadda 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -385,7 +385,7 @@ public class FileRecordsTest {
}
private static List<LogEntry> deepEntries(Records buffer) {
- return TestUtils.toList(buffer.deepEntries());
+ return TestUtils.toList(buffer.deepEntries(BufferSupplier.NO_CACHING));
}
private FileRecords createFileRecords(Record ... records) throws IOException {
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
index 47aebcb..74e9713 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/KafkaLZ4Test.java
@@ -16,63 +16,166 @@
*/
package org.apache.kafka.common.record;
-import static org.junit.Assert.assertArrayEquals;
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
+import net.jpountz.xxhash.XXHashFactory;
+
+import org.hamcrest.CoreMatchers;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
-import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
+import java.util.Random;
-import net.jpountz.xxhash.XXHashFactory;
-
-import org.junit.Test;
-import org.junit.runner.RunWith;
-import org.junit.runners.Parameterized;
-import org.junit.runners.Parameterized.Parameters;
+import static org.apache.kafka.common.record.KafkaLZ4BlockOutputStream.LZ4_FRAME_INCOMPRESSIBLE_MASK;
+import static org.junit.Assert.assertArrayEquals;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
@RunWith(value = Parameterized.class)
public class KafkaLZ4Test {
+ private final static Random RANDOM = new Random(0);
+
private final boolean useBrokenFlagDescriptorChecksum;
private final boolean ignoreFlagDescriptorChecksum;
private final byte[] payload;
private final boolean close;
+ private final boolean blockChecksum;
+
+ static class Payload {
+ String name;
+ byte[] payload;
+
+ Payload(String name, byte[] payload) {
+ this.name = name;
+ this.payload = payload;
+ }
+
+ @Override
+ public String toString() {
+ return "Payload{" +
+ "size=" + payload.length +
+ ", name='" + name + '\'' +
+ '}';
+ }
+ }
+
+ @Rule
+ public ExpectedException thrown = ExpectedException.none();
- public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum, byte[] payload, boolean close) {
+ @Parameters(name = "{index} useBrokenFlagDescriptorChecksum={0}, ignoreFlagDescriptorChecksum={1}, blockChecksum={2}, close={3}, payload={4}")
+ public static Collection<Object[]> data() {
+ List<Payload> payloads = new ArrayList<>();
+
+ payloads.add(new Payload("empty", new byte[]{}));
+ payloads.add(new Payload("onebyte", new byte[]{1}));
+
+ for (int size : Arrays.asList(1000, 1 << 16, (1 << 10) * 96)) {
+ byte[] random = new byte[size];
+ RANDOM.nextBytes(random);
+ payloads.add(new Payload("random", random));
+
+ byte[] ones = new byte[size];
+ Arrays.fill(ones, (byte) 1);
+ payloads.add(new Payload("ones", ones));
+ }
+
+ List<Object[]> values = new ArrayList<>();
+ for (Payload payload : payloads)
+ for (boolean broken : Arrays.asList(false, true))
+ for (boolean ignore : Arrays.asList(false, true))
+ for (boolean blockChecksum : Arrays.asList(false, true))
+ for (boolean close : Arrays.asList(false, true))
+ values.add(new Object[]{broken, ignore, blockChecksum, close, payload});
+ return values;
+ }
+
+ public KafkaLZ4Test(boolean useBrokenFlagDescriptorChecksum, boolean ignoreFlagDescriptorChecksum,
+ boolean blockChecksum, boolean close, Payload payload) {
this.useBrokenFlagDescriptorChecksum = useBrokenFlagDescriptorChecksum;
this.ignoreFlagDescriptorChecksum = ignoreFlagDescriptorChecksum;
- this.payload = payload;
+ this.payload = payload.payload;
this.close = close;
+ this.blockChecksum = blockChecksum;
}
- @Parameters
- public static Collection<Object[]> data() {
- byte[] payload = new byte[1000];
- Arrays.fill(payload, (byte) 1);
- List<Object[]> values = new ArrayList<Object[]>();
- for (boolean broken : Arrays.asList(false, true))
- for (boolean ignore : Arrays.asList(false, true))
- for (boolean close : Arrays.asList(false, true))
- values.add(new Object[] {broken, ignore, payload, close});
- return values;
+ @Test
+ public void testHeaderPrematureEnd() throws Exception {
+ thrown.expect(IOException.class);
+ thrown.expectMessage(KafkaLZ4BlockInputStream.PREMATURE_EOS);
+
+ final ByteBuffer buffer = ByteBuffer.allocate(2);
+ makeInputStream(buffer);
+ }
+
+ private KafkaLZ4BlockInputStream makeInputStream(
+ ByteBuffer buffer
+ )
+ throws IOException {
+ return new KafkaLZ4BlockInputStream(
+ buffer,
+ BufferSupplier.create(),
+ ignoreFlagDescriptorChecksum
+ );
}
@Test
- public void testKafkaLZ4() throws IOException {
- ByteArrayOutputStream output = new ByteArrayOutputStream();
- KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(output, this.useBrokenFlagDescriptorChecksum);
- lz4.write(this.payload, 0, this.payload.length);
- if (this.close) {
- lz4.close();
- } else {
- lz4.flush();
+ public void testNotSupported() throws Exception {
+ thrown.expect(IOException.class);
+ thrown.expectMessage(KafkaLZ4BlockInputStream.NOT_SUPPORTED);
+
+ byte[] compressed = compressedBytes();
+ compressed[0] = 0x00;
+
+ makeInputStream(ByteBuffer.wrap(compressed));
+ }
+
+ @Test
+ public void testBadFrameChecksum() throws Exception {
+ if (!ignoreFlagDescriptorChecksum) {
+ thrown.expect(IOException.class);
+ thrown.expectMessage(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH);
}
- byte[] compressed = output.toByteArray();
+
+ byte[] compressed = compressedBytes();
+ compressed[6] = (byte) 0xFF;
+
+ makeInputStream(ByteBuffer.wrap(compressed));
+ }
+
+ @Test
+ public void testBadBlockSize() throws Exception {
+ if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+ thrown.expect(IOException.class);
+ thrown.expectMessage(CoreMatchers.containsString("exceeded max"));
+
+ byte[] compressed = compressedBytes();
+ final ByteBuffer buffer = ByteBuffer.wrap(compressed).order(ByteOrder.LITTLE_ENDIAN);
+
+ int blockSize = buffer.getInt(7);
+ blockSize = (blockSize & LZ4_FRAME_INCOMPRESSIBLE_MASK) | (1 << 24 & ~LZ4_FRAME_INCOMPRESSIBLE_MASK);
+ buffer.putInt(7, blockSize);
+
+ testDecompression(buffer);
+ }
+
+
+
+ @Test
+ public void testCompression() throws Exception {
+ byte[] compressed = compressedBytes();
// Check magic bytes stored as little-endian
int offset = 0;
@@ -138,16 +241,125 @@ public class KafkaLZ4Test {
assertEquals(0, compressed[offset++]);
assertEquals(0, compressed[offset++]);
}
+ }
+
+ @Test
+ public void testArrayBackedBuffer() throws IOException {
+ byte[] compressed = compressedBytes();
+ testDecompression(ByteBuffer.wrap(compressed));
+ }
+
+ @Test
+ public void testArrayBackedBufferSlice() throws IOException {
+ byte[] compressed = compressedBytes();
+
+ int sliceOffset = 12;
+
+ ByteBuffer buffer = ByteBuffer.allocate(compressed.length + sliceOffset + 123);
+ buffer.position(sliceOffset);
+ buffer.put(compressed).flip();
+ buffer.position(sliceOffset);
+
+ ByteBuffer slice = buffer.slice();
+ testDecompression(slice);
+
+ int offset = 42;
+ buffer = ByteBuffer.allocate(compressed.length + sliceOffset + offset);
+ buffer.position(sliceOffset + offset);
+ buffer.put(compressed).flip();
+ buffer.position(sliceOffset);
- ByteArrayInputStream input = new ByteArrayInputStream(compressed);
+ slice = buffer.slice();
+ slice.position(offset);
+ testDecompression(slice);
+ }
+
+ @Test
+ public void testDirectBuffer() throws IOException {
+ byte[] compressed = compressedBytes();
+ ByteBuffer buffer;
+
+ buffer = ByteBuffer.allocateDirect(compressed.length);
+ buffer.put(compressed).flip();
+ testDecompression(buffer);
+
+ int offset = 42;
+ buffer = ByteBuffer.allocateDirect(compressed.length + offset + 123);
+ buffer.position(offset);
+ buffer.put(compressed).flip();
+ buffer.position(offset);
+ testDecompression(buffer);
+ }
+
+ @Test
+ public void testSkip() throws Exception {
+ if (!close || (useBrokenFlagDescriptorChecksum && !ignoreFlagDescriptorChecksum)) return;
+
+ final KafkaLZ4BlockInputStream in = makeInputStream(ByteBuffer.wrap(compressedBytes()));
+
+ int n = 100;
+ int remaining = payload.length;
+ long skipped = in.skip(n);
+ assertEquals(Math.min(n, remaining), skipped);
+
+ n = 10000;
+ remaining -= skipped;
+ skipped = in.skip(n);
+ assertEquals(Math.min(n, remaining), skipped);
+ }
+
+ private void testDecompression(ByteBuffer buffer) throws IOException {
+ IOException error = null;
try {
- KafkaLZ4BlockInputStream decompressed = new KafkaLZ4BlockInputStream(input, this.ignoreFlagDescriptorChecksum);
- byte[] testPayload = new byte[this.payload.length];
- int ret = decompressed.read(testPayload, 0, this.payload.length);
- assertEquals(ret, this.payload.length);
+ KafkaLZ4BlockInputStream decompressed = makeInputStream(buffer);
+
+ byte[] testPayload = new byte[payload.length];
+
+ byte[] tmp = new byte[1024];
+ int n, pos = 0, i = 0;
+ while ((n = decompressed.read(tmp, i, tmp.length - i)) != -1) {
+ i += n;
+ if (i == tmp.length) {
+ System.arraycopy(tmp, 0, testPayload, pos, i);
+ pos += i;
+ i = 0;
+ }
+ }
+ System.arraycopy(tmp, 0, testPayload, pos, i);
+ pos += i;
+
+ assertEquals(-1, decompressed.read(tmp, 0, tmp.length));
+ assertEquals(this.payload.length, pos);
assertArrayEquals(this.payload, testPayload);
} catch (IOException e) {
- assertTrue(this.useBrokenFlagDescriptorChecksum && !this.ignoreFlagDescriptorChecksum);
+ if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) {
+ assertEquals(KafkaLZ4BlockInputStream.DESCRIPTOR_HASH_MISMATCH, e.getMessage());
+ error = e;
+ } else if (!close) {
+ assertEquals(KafkaLZ4BlockInputStream.PREMATURE_EOS, e.getMessage());
+ error = e;
+ } else {
+ throw e;
+ }
+ }
+ if (!ignoreFlagDescriptorChecksum && useBrokenFlagDescriptorChecksum) assertNotNull(error);
+ if (!close) assertNotNull(error);
+ }
+
+ private byte[] compressedBytes() throws IOException {
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ KafkaLZ4BlockOutputStream lz4 = new KafkaLZ4BlockOutputStream(
+ output,
+ KafkaLZ4BlockOutputStream.BLOCKSIZE_64KB,
+ blockChecksum,
+ useBrokenFlagDescriptorChecksum
+ );
+ lz4.write(this.payload, 0, this.payload.length);
+ if (this.close) {
+ lz4.close();
+ } else {
+ lz4.flush();
}
+ return output.toByteArray();
}
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
----------------------------------------------------------------------
diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
index 5f668de..c2ef38b 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java
@@ -71,7 +71,7 @@ public class MemoryRecordsTest {
for (int iteration = 0; iteration < 2; iteration++) {
for (MemoryRecords recs : asList(recs1, recs2)) {
- Iterator<LogEntry> iter = recs.deepEntries().iterator();
+ Iterator<LogEntry> iter = recs.deepEntries(BufferSupplier.NO_CACHING).iterator();
for (int i = 0; i < list.size(); i++) {
assertTrue(iter.hasNext());
LogEntry entry = iter.next();
@@ -201,7 +201,7 @@ public class MemoryRecordsTest {
shallowEntry.record().timestampType());
}
- List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries());
+ List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries(BufferSupplier.NO_CACHING));
assertEquals(4, deepEntries.size());
LogEntry first = deepEntries.get(0);
@@ -263,7 +263,7 @@ public class MemoryRecordsTest {
}
}
- @Parameterized.Parameters
+ @Parameterized.Parameters(name = "{index} magic={0}, firstOffset={1}, compressionType={2}")
public static Collection<Object[]> data() {
List<Object[]> values = new ArrayList<>();
for (long firstOffset : asList(0L, 57L))
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/log/LogValidator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogValidator.scala b/core/src/main/scala/kafka/log/LogValidator.scala
index 224a792..47d4bd4 100644
--- a/core/src/main/scala/kafka/log/LogValidator.scala
+++ b/core/src/main/scala/kafka/log/LogValidator.scala
@@ -164,7 +164,7 @@ private[kafka] object LogValidator {
val expectedInnerOffset = new LongRef(0)
val validatedRecords = new mutable.ArrayBuffer[Record]
- records.deepEntries(true).asScala.foreach { logEntry =>
+ records.deepEntries(true, BufferSupplier.NO_CACHING).asScala.foreach { logEntry =>
val record = logEntry.record
validateKey(record, compactedTopic)
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/main/scala/kafka/message/CompressionFactory.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/message/CompressionFactory.scala b/core/src/main/scala/kafka/message/CompressionFactory.scala
index e02ed63..592d09e 100644
--- a/core/src/main/scala/kafka/message/CompressionFactory.scala
+++ b/core/src/main/scala/kafka/message/CompressionFactory.scala
@@ -17,12 +17,11 @@
package kafka.message
-import java.io.OutputStream
-import java.util.zip.GZIPOutputStream
-import java.util.zip.GZIPInputStream
-import java.io.InputStream
+import java.io.{InputStream, OutputStream}
+import java.nio.ByteBuffer
+import java.util.zip.{GZIPInputStream, GZIPOutputStream}
-import org.apache.kafka.common.record.{KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
+import org.apache.kafka.common.record.{BufferSupplier, KafkaLZ4BlockInputStream, KafkaLZ4BlockOutputStream}
object CompressionFactory {
@@ -40,15 +39,15 @@ object CompressionFactory {
}
}
- def apply(compressionCodec: CompressionCodec, messageVersion: Byte, stream: InputStream): InputStream = {
+ def apply(compressionCodec: CompressionCodec, messageVersion: Byte, buffer: ByteBuffer): InputStream = {
compressionCodec match {
- case DefaultCompressionCodec => new GZIPInputStream(stream)
- case GZIPCompressionCodec => new GZIPInputStream(stream)
+ case DefaultCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer))
+ case GZIPCompressionCodec => new GZIPInputStream(new ByteBufferBackedInputStream(buffer))
case SnappyCompressionCodec =>
import org.xerial.snappy.SnappyInputStream
- new SnappyInputStream(stream)
+ new SnappyInputStream(new ByteBufferBackedInputStream(buffer))
case LZ4CompressionCodec =>
- new KafkaLZ4BlockInputStream(stream, messageVersion == Message.MagicValue_V0)
+ new KafkaLZ4BlockInputStream(buffer, BufferSupplier.NO_CACHING, messageVersion == Message.MagicValue_V0)
case _ =>
throw new kafka.common.UnknownCodecException("Unknown Codec: " + compressionCodec)
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
index 5d2c8fb..9f63604 100644
--- a/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageCompressionTest.scala
@@ -18,9 +18,10 @@
package kafka.message
import org.apache.kafka.common.record._
-
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
+import java.nio.ByteBuffer
+
import scala.collection._
import org.scalatest.junit.JUnitSuite
import org.junit._
@@ -33,7 +34,7 @@ class MessageCompressionTest extends JUnitSuite {
val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayOutputStream())
assertTrue(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
- val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
+ val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V0, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, 0x1A)))
assertTrue(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
}
@@ -42,7 +43,7 @@ class MessageCompressionTest extends JUnitSuite {
val output = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayOutputStream())
assertFalse(output.asInstanceOf[KafkaLZ4BlockOutputStream].useBrokenFlagDescriptorChecksum())
- val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, new ByteArrayInputStream(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
+ val input = CompressionFactory(LZ4CompressionCodec, Message.MagicValue_V1, ByteBuffer.wrap(Array[Byte](0x04, 0x22, 0x4D, 0x18, 0x60, 0x40, -126)))
assertFalse(input.asInstanceOf[KafkaLZ4BlockInputStream].ignoreFlagDescriptorChecksum())
}
http://git-wip-us.apache.org/repos/asf/kafka/blob/e944956a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
index a82a553..467f66c 100644
--- a/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
+++ b/core/src/test/scala/unit/kafka/message/MessageWriterTest.scala
@@ -60,7 +60,7 @@ class MessageWriterTest extends JUnitSuite {
}
private def decompress(compressed: Array[Byte], codec: CompressionCodec): Array[Byte] = {
- toArray(CompressionFactory(codec, Message.MagicValue_V1, new ByteArrayInputStream(compressed)))
+ toArray(CompressionFactory(codec, Message.MagicValue_V1, ByteBuffer.wrap(compressed)))
}
private def toArray(in: InputStream): Array[Byte] = {