You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 02:33:56 UTC
[pulsar] 02/02: Issue 8974: Peeking at compressed messages throws
an exception (Readonly buffers not supported by Airlift) (#8990)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git
commit fbbc251de7e312b63382ad3940b62593a112a82e
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 22 18:53:27 2020 +0100
Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)
Fixes #8974
### Motivation
In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array)
### Modifications
Copy the ByteByffer in case of unsupported buffer type
### Verifying this change
This change adds new tests that reproduce the error and demonstrate that the problem is fixed.
(cherry picked from commit cbc606b0b0e836c1238ea1ba92400b3f14e5b349)
---
.../pulsar/common/compression/AirliftUtils.java | 38 ++++++++++++++++++++++
.../common/compression/CompressionCodecLZ4.java | 3 +-
.../common/compression/CompressionCodecSnappy.java | 1 +
.../common/compression/CompressionCodecZstd.java | 2 +-
.../common/compression/CompressorCodecTest.java | 31 ++++++++++++++++++
5 files changed, 73 insertions(+), 2 deletions(-)
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
new file mode 100644
index 0000000..3bfc609
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities.
+ */
+public abstract class AirliftUtils {
+
+ static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) {
+ if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
+ // airlift needs a raw ByteArray
+ ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
+ copy.put(encodedNio);
+ copy.flip();
+ encodedNio = copy;
+ }
+ return encodedNio;
+ }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index 2493af4..12a03d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -96,11 +96,12 @@ public class CompressionCodecLZ4 implements CompressionCodec {
} else {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+ encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}
uncompressed.writerIndex(uncompressedLength);
return uncompressed;
}
+
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 517f1ca..1e31edc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -103,6 +103,7 @@ public class CompressionCodecSnappy implements CompressionCodec {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
+ encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 944e1e5..18caee6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -97,7 +97,7 @@ public class CompressionCodecZstd implements CompressionCodec {
} else {
ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+ encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
}
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 46e5718..ec84741 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -84,6 +84,37 @@ public class CompressorCodecTest {
}
@Test(dataProvider = "codec")
+ void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException {
+ CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
+ byte[] data = text.getBytes();
+ ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
+ raw.writeBytes(data);
+
+ ByteBuf compressed = codec.encode(raw);
+ assertEquals(raw.readableBytes(), data.length);
+
+ int compressedSize = compressed.readableBytes();
+ // Readonly ByteBuffers are not supported by AirLift
+ // https://github.com/apache/pulsar/issues/8974
+ ByteBuf compressedComplexByteBuf = compressed.asReadOnly();
+ ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length);
+
+ assertEquals(compressed.readableBytes(), compressedSize);
+
+ assertEquals(uncompressed.readableBytes(), data.length);
+ assertEquals(uncompressed, raw);
+
+ raw.release();
+ compressed.release();
+ uncompressed.release();
+
+ // Verify compression codecs have the same behavior with buffers ref counting
+ assertEquals(raw.refCnt(), 0);
+ assertEquals(compressed.refCnt(), 0);
+ assertEquals(compressed.refCnt(), 0);
+ }
+
+ @Test(dataProvider = "codec")
void testEmptyInput(CompressionType type, String compressedText) throws IOException {
CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);