You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/12/03 16:27:44 UTC

[kafka] branch 2.3 updated: KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/2.3 by this push:
     new f8129f6  KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)
f8129f6 is described below

commit f8129f6fa82b8aeaca4d2eb182183a6b86a5598b
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Tue Dec 3 08:26:17 2019 -0800

    KAFKA-9203: Revert "MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)" (#7769)
    
    This reverts commit 90043d5f as it caused a regression in some cases:
    
    > Caused by: java.io.IOException: Stream frame descriptor corrupted
    >         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.readHeader(KafkaLZ4BlockInputStream.java:132)
    >         at org.apache.kafka.common.record.KafkaLZ4BlockInputStream.<init>(KafkaLZ4BlockInputStream.java:78)
    >         at org.apache.kafka.common.record.CompressionType$4.wrapForInput(CompressionType.java:110)
    
    I will investigate why after, but I want to get the safe fix into 2.4.0.
    The reporter of KAFKA-9203 has verified that reverting this change
    makes the problem go away.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../common/record/KafkaLZ4BlockInputStream.java    | 33 +++++++++++++++++++---
 1 file changed, 29 insertions(+), 4 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 9a37833..850b1e9 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -77,6 +77,11 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
         this.bufferSupplier = bufferSupplier;
         readHeader();
         decompressionBuffer = bufferSupplier.get(maxBlockSize);
+        if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
+            // require array backed decompression buffer with zero offset
+            // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
+            throw new RuntimeException("decompression buffer must have backing array with zero array offset");
+        }
         finished = false;
     }
 
@@ -126,7 +131,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         int len = in.position() - in.reset().position();
 
-        int hash = CHECKSUM.hash(in, in.position(), len, 0);
+        int hash = in.hasArray() ?
+                       // workaround for https://github.com/lz4/lz4-java/pull/65
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
+                       CHECKSUM.hash(in, in.position(), len, 0);
         in.position(in.position() + len);
         if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
             throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -164,8 +172,22 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         if (compressed) {
             try {
-                final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0,
-                    maxBlockSize);
+                // workaround for https://github.com/lz4/lz4-java/pull/65
+                final int bufferSize;
+                if (in.hasArray()) {
+                    bufferSize = DECOMPRESSOR.decompress(
+                        in.array(),
+                        in.position() + in.arrayOffset(),
+                        blockSize,
+                        decompressionBuffer.array(),
+                        0,
+                        maxBlockSize
+                    );
+                } else {
+                    // decompressionBuffer has zero arrayOffset, so we don't need to worry about
+                    // https://github.com/lz4/lz4-java/pull/65
+                    bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
+                }
                 decompressionBuffer.position(0);
                 decompressionBuffer.limit(bufferSize);
                 decompressedBuffer = decompressionBuffer;
@@ -179,7 +201,10 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         // verify checksum
         if (flg.isBlockChecksumSet()) {
-            int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
+            // workaround for https://github.com/lz4/lz4-java/pull/65
+            int hash = in.hasArray() ?
+                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
+                       CHECKSUM.hash(in, in.position(), blockSize, 0);
             in.position(in.position() + blockSize);
             if (hash != in.getInt()) {
                 throw new IOException(BLOCK_HASH_MISMATCH);