You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2020/12/23 02:33:56 UTC

[pulsar] 02/02: Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.6
in repository https://gitbox.apache.org/repos/asf/pulsar.git

commit fbbc251de7e312b63382ad3940b62593a112a82e
Author: Enrico Olivelli <eo...@gmail.com>
AuthorDate: Tue Dec 22 18:53:27 2020 +0100

    Issue 8974: Peeking at compressed messages throws an exception (Readonly buffers not supported by Airlift) (#8990)
    
    Fixes #8974
    
    ### Motivation
    In certain cases peeking messages on compresses topics return an error, see #8974 because Airlift does not support readonly ByteBuffers, because they do not give access to the underlying array)
    
    ### Modifications
    
    Copy the ByteByffer in case of unsupported buffer type
    
    ### Verifying this change
    
    This change adds new tests that reproduce the error and demonstrate that the problem is fixed.
    
    (cherry picked from commit cbc606b0b0e836c1238ea1ba92400b3f14e5b349)
---
 .../pulsar/common/compression/AirliftUtils.java    | 38 ++++++++++++++++++++++
 .../common/compression/CompressionCodecLZ4.java    |  3 +-
 .../common/compression/CompressionCodecSnappy.java |  1 +
 .../common/compression/CompressionCodecZstd.java   |  2 +-
 .../common/compression/CompressorCodecTest.java    | 31 ++++++++++++++++++
 5 files changed, 73 insertions(+), 2 deletions(-)

diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
new file mode 100644
index 0000000..3bfc609
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/AirliftUtils.java
@@ -0,0 +1,38 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Utilities.
+ */
+public abstract class AirliftUtils {
+
+    static ByteBuffer ensureAirliftSupported(ByteBuffer encodedNio, int uncompressedLength) {
+        if (!encodedNio.isDirect() && !encodedNio.hasArray()) {
+            // airlift needs a raw ByteArray
+            ByteBuffer copy = ByteBuffer.allocate(uncompressedLength);
+            copy.put(encodedNio);
+            copy.flip();
+            encodedNio = copy;
+        }
+        return encodedNio;
+    }
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
index 2493af4..12a03d1 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecLZ4.java
@@ -96,11 +96,12 @@ public class CompressionCodecLZ4 implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             LZ4_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
         uncompressed.writerIndex(uncompressedLength);
         return uncompressed;
     }
+
 }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
index 517f1ca..1e31edc 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -103,6 +103,7 @@ public class CompressionCodecSnappy implements CompressionCodec {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
 
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             SNAPPY_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
index 944e1e5..18caee6 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecZstd.java
@@ -97,7 +97,7 @@ public class CompressionCodecZstd implements CompressionCodec {
         } else {
             ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
             ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
-
+            encodedNio = AirliftUtils.ensureAirliftSupported(encodedNio, uncompressedLength);
             ZSTD_DECOMPRESSOR.get().decompress(encodedNio, uncompressedNio);
         }
 
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 46e5718..ec84741 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -84,6 +84,37 @@ public class CompressorCodecTest {
     }
 
     @Test(dataProvider = "codec")
+    void testDecompressReadonlyByteBuf(CompressionType type, String compressedText) throws IOException {
+        CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);
+        byte[] data = text.getBytes();
+        ByteBuf raw = PulsarByteBufAllocator.DEFAULT.directBuffer();
+        raw.writeBytes(data);
+
+        ByteBuf compressed = codec.encode(raw);
+        assertEquals(raw.readableBytes(), data.length);
+
+        int compressedSize = compressed.readableBytes();
+        // Readonly ByteBuffers are not supported by AirLift
+        // https://github.com/apache/pulsar/issues/8974
+        ByteBuf compressedComplexByteBuf = compressed.asReadOnly();
+        ByteBuf uncompressed = codec.decode(compressedComplexByteBuf, data.length);
+
+        assertEquals(compressed.readableBytes(), compressedSize);
+
+        assertEquals(uncompressed.readableBytes(), data.length);
+        assertEquals(uncompressed, raw);
+
+        raw.release();
+        compressed.release();
+        uncompressed.release();
+
+        // Verify compression codecs have the same behavior with buffers ref counting
+        assertEquals(raw.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+        assertEquals(compressed.refCnt(), 0);
+    }
+
+    @Test(dataProvider = "codec")
     void testEmptyInput(CompressionType type, String compressedText) throws IOException {
         CompressionCodec codec = CompressionCodecProvider.getCompressionCodec(type);