You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/06 13:54:16 UTC

[kafka] branch trunk updated: MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)

This is an automated email from the ASF dual-hosted git repository.

ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 90043d5  MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)
90043d5 is described below

commit 90043d5f7e1b09a959080c2064f758d5890fc454
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon May 6 06:53:49 2019 -0700

    MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)
    
    lz4/lz4-java#65 was included in lz4-java 1.4.0.
    
    Relying on existing tests for verification.
    
    Reviewers: Manikumar Reddy <ma...@gmail.com>
---
 .../common/record/KafkaLZ4BlockInputStream.java    | 33 +++-------------------
 1 file changed, 4 insertions(+), 29 deletions(-)

diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 850b1e9..9a37833 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -77,11 +77,6 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
         this.bufferSupplier = bufferSupplier;
         readHeader();
         decompressionBuffer = bufferSupplier.get(maxBlockSize);
-        if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
-            // require array backed decompression buffer with zero offset
-            // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
-            throw new RuntimeException("decompression buffer must have backing array with zero array offset");
-        }
         finished = false;
     }
 
@@ -131,10 +126,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         int len = in.position() - in.reset().position();
 
-        int hash = in.hasArray() ?
-                       // workaround for https://github.com/lz4/lz4-java/pull/65
-                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
-                       CHECKSUM.hash(in, in.position(), len, 0);
+        int hash = CHECKSUM.hash(in, in.position(), len, 0);
         in.position(in.position() + len);
         if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
             throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -172,22 +164,8 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         if (compressed) {
             try {
-                // workaround for https://github.com/lz4/lz4-java/pull/65
-                final int bufferSize;
-                if (in.hasArray()) {
-                    bufferSize = DECOMPRESSOR.decompress(
-                        in.array(),
-                        in.position() + in.arrayOffset(),
-                        blockSize,
-                        decompressionBuffer.array(),
-                        0,
-                        maxBlockSize
-                    );
-                } else {
-                    // decompressionBuffer has zero arrayOffset, so we don't need to worry about
-                    // https://github.com/lz4/lz4-java/pull/65
-                    bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
-                }
+                final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0,
+                    maxBlockSize);
                 decompressionBuffer.position(0);
                 decompressionBuffer.limit(bufferSize);
                 decompressedBuffer = decompressionBuffer;
@@ -201,10 +179,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
 
         // verify checksum
         if (flg.isBlockChecksumSet()) {
-            // workaround for https://github.com/lz4/lz4-java/pull/65
-            int hash = in.hasArray() ?
-                       CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
-                       CHECKSUM.hash(in, in.position(), blockSize, 0);
+            int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
             in.position(in.position() + blockSize);
             if (hash != in.getInt()) {
                 throw new IOException(BLOCK_HASH_MISMATCH);