You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/03 16:27:44 UTC
[kafka] branch 2.3 updated: KAFKA-9203: Revert "MINOR: Remove
workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new f8129f6 KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)
f8129f6 is described below
commit f8129f6fa82b8aeaca4d2eb182183a6b86a5598b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Dec 3 08:26:17 2019 -0800
KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)
This reverts commit 90043d5f as it caused a regression in some cases:
> Caused by: java.io.IOException: Stream frame descriptor corrupted
> at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
> at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
> at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
I will investigate why after, but I want to get the safe fix into 2.4.0.
The reporter of KAFKA-9203 has verified that reverting this change
makes the problem go away.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../common/record/KafkaLZ4BlockInputStream.java | 33 +++++++++++++++++++---
1 file changed, 29 insertions(+), 4 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 9a37833..850b1e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -77,6 +77,11 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
this.bufferSupplier = bufferSupplier;
readHeader();
decompressionBuffer = bufferSupplier.get(maxBlockSize);
+ if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+ // require array backed decompression buffer with zero offset
+ // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+ throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+ }
finished = false;
}
@@ -126,7 +131,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
int len = in.position() - in.reset().position();
- int hash = CHECKSUM.hash(in, in.position(), len, 0);
+ int hash = in.hasArray() ?
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+ CHECKSUM.hash(in, in.position(), len, 0);
in.position(in.position() + len);
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -164,8 +172,22 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
if (compressed) {
try {
- final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0,
- maxBlockSize);
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ final int bufferSize;
+ if (in.hasArray()) {
+ bufferSize = DECOMPRESSOR.decompress(
+ in.array(),
+ in.position() + in.arrayOffset(),
+ blockSize,
+ decompressionBuffer.array(),
+ 0,
+ maxBlockSize
+ );
+ } else {
+ // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+ // https://github.com/lz4/lz4-java/pull/65
+ bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+ }
decompressionBuffer.position(0);
decompressionBuffer.limit(bufferSize);
decompressedBuffer = decompressionBuffer;
@@ -179,7 +201,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
// verify checksum
if (flg.isBlockChecksumSet()) {
- int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
+ // workaround for https://github.com/lz4/lz4-java/pull/65
+ int hash = in.hasArray() ?
+ CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+ CHECKSUM.hash(in, in.position(), blockSize, 0);
in.position(in.position() + blockSize);
if (hash != in.getInt()) {
throw new IOException(BLOCK_HASH_MISMATCH);