You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@kafka.apache.org by ij...@apache.org on 2019/05/06 13:54:16 UTC
[kafka] branch trunk updated: MINOR: Remove workarounds for
lz4-java bug affecting byte buffers (#6679)
This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 90043d5 MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)
90043d5 is described below
commit 90043d5f7e1b09a959080c2064f758d5890fc454
Author: Ismael Juma <is...@juma.me.uk>
AuthorDate: Mon May 6 06:53:49 2019 -0700
MINOR: Remove workarounds for lz4-java bug affecting byte buffers (#6679)
lz4/lz4-java#65 was included in lz4-java 1.4.0.
Relying on existing tests for verification.
Reviewers: Manikumar Reddy <ma...@gmail.com>
---
.../common/record/KafkaLZ4BlockInputStream.java | 33 +++-------------------
1 file changed, 4 insertions(+), 29 deletions(-)
diff --git a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
index 850b1e9..9a37833 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/KafkaLZ4BlockInputStream.java
@@ -77,11 +77,6 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
this.bufferSupplier = bufferSupplier;
readHeader();
decompressionBuffer = bufferSupplier.get(maxBlockSize);
- if (!decompressionBuffer.hasArray() || decompressionBuffer.arrayOffset() != 0) {
- // require array backed decompression buffer with zero offset
- // to simplify workaround for https://github.com/lz4/lz4-java/pull/65
- throw new RuntimeException("decompression buffer must have backing array with zero array offset");
- }
finished = false;
}
@@ -131,10 +126,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
int len = in.position() - in.reset().position();
- int hash = in.hasArray() ?
- // workaround for https://github.com/lz4/lz4-java/pull/65
- CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), len, 0) :
- CHECKSUM.hash(in, in.position(), len, 0);
+ int hash = CHECKSUM.hash(in, in.position(), len, 0);
in.position(in.position() + len);
if (in.get() != (byte) ((hash >> 8) & 0xFF)) {
throw new IOException(DESCRIPTOR_HASH_MISMATCH);
@@ -172,22 +164,8 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
if (compressed) {
try {
- // workaround for https://github.com/lz4/lz4-java/pull/65
- final int bufferSize;
- if (in.hasArray()) {
- bufferSize = DECOMPRESSOR.decompress(
- in.array(),
- in.position() + in.arrayOffset(),
- blockSize,
- decompressionBuffer.array(),
- 0,
- maxBlockSize
- );
- } else {
- // decompressionBuffer has zero arrayOffset, so we don't need to worry about
- // https://github.com/lz4/lz4-java/pull/65
- bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0, maxBlockSize);
- }
+ final int bufferSize = DECOMPRESSOR.decompress(in, in.position(), blockSize, decompressionBuffer, 0,
+ maxBlockSize);
decompressionBuffer.position(0);
decompressionBuffer.limit(bufferSize);
decompressedBuffer = decompressionBuffer;
@@ -201,10 +179,7 @@ public final class KafkaLZ4BlockInputStream extends InputStream {
// verify checksum
if (flg.isBlockChecksumSet()) {
- // workaround for https://github.com/lz4/lz4-java/pull/65
- int hash = in.hasArray() ?
- CHECKSUM.hash(in.array(), in.arrayOffset() + in.position(), blockSize, 0) :
- CHECKSUM.hash(in, in.position(), blockSize, 0);
+ int hash = CHECKSUM.hash(in, in.position(), blockSize, 0);
in.position(in.position() + blockSize);
if (hash != in.getInt()) {
throw new IOException(BLOCK_HASH_MISMATCH);