You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/21 00:00:11 UTC

[pulsar] branch master updated: [pulsar-common] Support Snappy compression for Java (#4259)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 706a18e  [pulsar-common] Support Snappy compression for Java (#4259)
706a18e is described below

commit 706a18e56b52468b1b48832115959983a3fffbe0
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue May 21 08:00:01 2019 +0800

    [pulsar-common] Support Snappy compression for Java (#4259)
    
    * Support Snappy compression for java.
    
    * Some minor fix to pass unit tests
    
    * Format the cpp code
    
    * Added support for c++ client
    
    * Format the cpp code
---
 build/docker/Dockerfile                            |  2 +-
 pom.xml                                            |  7 ++
 .../apache/pulsar/client/api/CompressionType.java  |  3 +
 .../apache/pulsar/client/api/ProducerBuilder.java  |  2 +
 pulsar-client-cpp/CMakeLists.txt                   | 14 ++++
 pulsar-client-cpp/docker/Dockerfile                |  8 +++
 pulsar-client-cpp/include/pulsar/CompressionType.h |  1 +
 .../include/pulsar/ProducerConfiguration.h         |  2 +
 pulsar-client-cpp/lib/CompressionCodec.cc          |  8 +++
 pulsar-client-cpp/lib/CompressionCodec.h           |  2 +
 pulsar-client-cpp/lib/CompressionCodecSnappy.cc    | 79 ++++++++++++++++++++++
 .../CompressionCodecSnappy.h}                      | 21 +++---
 pulsar-common/pom.xml                              |  5 ++
 .../apache/pulsar/common/api/proto/PulsarApi.java  |  3 +
 .../compression/CompressionCodecProvider.java      |  5 ++
 .../common/compression/CompressionCodecSnappy.java | 66 ++++++++++++++++++
 pulsar-common/src/main/proto/PulsarApi.proto       |  1 +
 .../common/compression/CompressorCodecTest.java    |  4 +-
 site2/docs/concepts-messaging.md                   |  1 +
 site2/docs/reference-cli-tools.md                  |  2 +-
 20 files changed, 220 insertions(+), 16 deletions(-)

diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile
index dd35794..bcb17c0 100644
--- a/build/docker/Dockerfile
+++ b/build/docker/Dockerfile
@@ -28,7 +28,7 @@ RUN apt-get update && \
                 liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
                 libjsoncpp-dev libxml2-utils protobuf-compiler wget \
                 curl doxygen openjdk-8-jdk-headless clang-format-5.0 \
-                gnupg2 golang-1.10-go zip unzip libzstd-dev
+                gnupg2 golang-1.10-go zip unzip libzstd-dev libsnappy-dev
 
 # Compile and install gtest
 RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib
diff --git a/pom.xml b/pom.xml
index 5b28f3d..ca7f482 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,7 @@ flexible messaging model and an intuitive client API.</description>
     <jsonwebtoken.version>0.10.5</jsonwebtoken.version>
     <opencensus.version>0.18.0</opencensus.version>
     <zstd.version>1.3.7-3</zstd.version>
+    <snappy.version>1.1.1.3</snappy.version>
     <hbase.version>1.4.9</hbase.version>
 
     <!-- test dependencies -->
@@ -488,6 +489,12 @@ flexible messaging model and an intuitive client API.</description>
       </dependency>
 
       <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>${snappy.version}</version>
+      </dependency>
+
+      <dependency>
         <groupId>org.slf4j</groupId>
         <artifactId>slf4j-api</artifactId>
         <version>${slf4j.version}</version>
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
index 49eb925..180fe62 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
@@ -33,4 +33,7 @@ public enum CompressionType {
 
     /** Compress with Zstandard codec */
     ZSTD,
+
+    /** Compress with Snappy codec */
+    SNAPPY
 }
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 2224e51..a98036c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -260,6 +260,8 @@ public interface ProducerBuilder<T> extends Cloneable {
      * <li>{@link CompressionType#ZLIB}: Standard ZLib compression</li>
      * <li>{@link CompressionType#ZSTD} Compress with Zstandard codec. Since Pulsar 2.3. Zstd cannot be used if consumer
      * applications are not in version >= 2.3 as well</li>
+     * <li>{@link CompressionType#SNAPPY} Compress with Snappy codec. Since Pulsar 2.4. Snappy cannot be used if consumer
+     * applications are not in version >= 2.4 as well</li>
      * </ul>
      *
      * @param compressionType
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 34c5809..57df5f5 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -120,6 +120,8 @@ endif()
 
 find_library(LIB_ZSTD zstd libzstd)
 
+find_library(LIB_SNAPPY snappy libsnappy)
+
 if (BUILD_PYTHON_WRAPPER)
     find_package(PythonLibs REQUIRED)
     MESSAGE(STATUS "PYTHON: " ${PYTHONLIBS_VERSION_STRING})
@@ -183,6 +185,12 @@ else ()
 endif ()
 MESSAGE(STATUS "HAS_ZSTD: ${HAS_ZSTD}")
 
+if (LIB_SNAPPY)
+    set(HAS_SNAPPY 1)
+else ()
+    set(HAS_SNAPPY 0)
+endif ()
+MESSAGE(STATUS "HAS_SNAPPY: ${HAS_SNAPPY}")
 
 set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
 link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
@@ -241,6 +249,12 @@ endif ()
 
 add_definitions(-DHAS_ZSTD=${HAS_ZSTD})
 
+if (HAS_SNAPPY)
+    set(COMMON_LIBS ${COMMON_LIBS} ${LIB_SNAPPY} )
+endif ()
+
+add_definitions(-DHAS_SNAPPY=${HAS_SNAPPY})
+
 if(NOT APPLE AND NOT MSVC)
     set(COMMON_LIBS ${COMMON_LIBS} rt)
 endif ()
diff --git a/pulsar-client-cpp/docker/Dockerfile b/pulsar-client-cpp/docker/Dockerfile
index 965d8ff..043ebf0 100644
--- a/pulsar-client-cpp/docker/Dockerfile
+++ b/pulsar-client-cpp/docker/Dockerfile
@@ -149,6 +149,14 @@ RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.
     make install && \
     rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
 
+# Snappy
+RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
+    tar xvfz snappy-1.1.3.tar.gz && \
+    cd snappy-1.1.3 && \
+    CFLAGS="-fPIC -O3" ./configure && \
+    make && make install && \
+    rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
+
 RUN pip install twine
 RUN pip install fastavro
 RUN pip install six
diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/include/pulsar/CompressionType.h
index 2306b14..6fd663a 100644
--- a/pulsar-client-cpp/include/pulsar/CompressionType.h
+++ b/pulsar-client-cpp/include/pulsar/CompressionType.h
@@ -26,6 +26,7 @@ enum CompressionType
     CompressionLZ4 = 1,
     CompressionZLib = 2,
     CompressionZSTD = 3,
+    CompressionSNAPPY = 4
 };
 }
 
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 0708086..2de22d8 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -100,6 +100,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
      * <li>{@link CompressionZLib}: ZLib Compression http://zlib.net/</li>
      * <li>{@link CompressionZSTD}: Zstandard Compression  https://facebook.github.io/zstd/ (Since Pulsar 2.3.
      * Zstd cannot be used if consumer applications are not in version >= 2.3 as well)</li>
+     * <li>{@link CompressionSNAPPY}: Snappy Compression  https://google.github.io/snappy/ (Since Pulsar 2.4.
+     * Snappy cannot be used if consumer applications are not in version >= 2.4 as well)</li>
      * </ul>
      */
     ProducerConfiguration& setCompressionType(CompressionType compressionType);
diff --git a/pulsar-client-cpp/lib/CompressionCodec.cc b/pulsar-client-cpp/lib/CompressionCodec.cc
index 183439f..b092cf9 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.cc
+++ b/pulsar-client-cpp/lib/CompressionCodec.cc
@@ -20,6 +20,7 @@
 #include "CompressionCodecLZ4.h"
 #include "CompressionCodecZLib.h"
 #include "CompressionCodecZstd.h"
+#include "CompressionCodecSnappy.h"
 
 #include <cassert>
 
@@ -30,6 +31,7 @@ CompressionCodecNone CompressionCodecProvider::compressionCodecNone_;
 CompressionCodecLZ4 CompressionCodecProvider::compressionCodecLZ4_;
 CompressionCodecZLib CompressionCodecProvider::compressionCodecZLib_;
 CompressionCodecZstd CompressionCodecProvider::compressionCodecZstd_;
+CompressionCodecSnappy CompressionCodecProvider::compressionCodecSnappy_;
 
 CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compressionType) {
     switch (compressionType) {
@@ -39,6 +41,8 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression
             return compressionCodecZLib_;
         case CompressionZSTD:
             return compressionCodecZstd_;
+        case CompressionSNAPPY:
+            return compressionCodecSnappy_;
         default:
             return compressionCodecNone_;
     }
@@ -54,6 +58,8 @@ CompressionType CompressionCodecProvider::convertType(proto::CompressionType typ
             return CompressionZLib;
         case proto::ZSTD:
             return CompressionZSTD;
+        case proto::SNAPPY:
+            return CompressionSNAPPY;
     }
 }
 
@@ -67,6 +73,8 @@ proto::CompressionType CompressionCodecProvider::convertType(CompressionType typ
             return proto::ZLIB;
         case CompressionZSTD:
             return proto::ZSTD;
+        case CompressionSNAPPY:
+            return proto::SNAPPY;
     }
 }
 
diff --git a/pulsar-client-cpp/lib/CompressionCodec.h b/pulsar-client-cpp/lib/CompressionCodec.h
index 00dd013..fd65f9c 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.h
+++ b/pulsar-client-cpp/lib/CompressionCodec.h
@@ -35,6 +35,7 @@ class CompressionCodecNone;
 class CompressionCodecLZ4;
 class CompressionCodecZLib;
 class CompressionCodecZstd;
+class CompressionCodecSnappy;
 
 class PULSAR_PUBLIC CompressionCodecProvider {
    public:
@@ -48,6 +49,7 @@ class PULSAR_PUBLIC CompressionCodecProvider {
     static CompressionCodecLZ4 compressionCodecLZ4_;
     static CompressionCodecZLib compressionCodecZLib_;
     static CompressionCodecZstd compressionCodecZstd_;
+    static CompressionCodecSnappy compressionCodecSnappy_;
 };
 
 class PULSAR_PUBLIC CompressionCodec {
diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.cc b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
new file mode 100644
index 0000000..f7a53ef
--- /dev/null
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "CompressionCodecSnappy.h"
+
+#if HAS_SNAPPY
+#include <snappy.h>
+#include "snappy-c.h"
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+    // Get the max size of the compressed data and allocate a buffer to hold it
+    int maxCompressedSize = snappy_max_compressed_length(raw.readableBytes());
+    SharedBuffer compressed = SharedBuffer::allocate(maxCompressedSize);
+
+    unsigned long bytesWritten = maxCompressedSize;
+
+    snappy_status status =
+        snappy_compress(raw.data(), raw.readableBytes(), compressed.mutableData(), &bytesWritten);
+
+    if (status != SNAPPY_OK) {
+        LOG_ERROR("Failed to compress to snappy. res=" << res);
+        abort();
+    }
+
+    compressed.bytesWritten(bytesWritten);
+
+    return compressed;
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+                                    SharedBuffer& decoded) {
+    SharedBuffer decompressed = SharedBuffer::allocate(uncompressedSize);
+
+    snappy_status status = snappy_uncompress(encoded.data(), encoded.readableBytes(),
+                                             decompressed.mutableData(), uncompressedSize);
+
+    if (status == SNAPPY_OK) {
+        decoded = decompressed;
+        decoded.setWriterIndex(uncompressedSize);
+        return true;
+    } else {
+        // Decompression failed
+        return false;
+    }
+}
+}  // namespace pulsar
+
+#else  // No SNAPPY
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+    throw "Snappy compression not supported";
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+                                    SharedBuffer& decoded) {
+    throw "Snappy compression not supported";
+}
+}  // namespace pulsar
+
+#endif  // HAS_SNAPPY
diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
similarity index 74%
copy from pulsar-client-cpp/include/pulsar/CompressionType.h
copy to pulsar-client-cpp/lib/CompressionCodecSnappy.h
index 2306b14..4738376 100644
--- a/pulsar-client-cpp/include/pulsar/CompressionType.h
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
@@ -16,17 +16,16 @@
  * specific language governing permissions and limitations
  * under the License.
  */
-#ifndef PULSAR_COMPRESSIONTYPE_H_
-#define PULSAR_COMPRESSIONTYPE_H_
+#pragma once
+
+#include "CompressionCodec.h"
 
 namespace pulsar {
-enum CompressionType
-{
-    CompressionNone = 0,
-    CompressionLZ4 = 1,
-    CompressionZLib = 2,
-    CompressionZSTD = 3,
-};
-}
 
-#endif /* PULSAR_COMPRESSIONTYPE_H_ */
+class CompressionCodecSnappy : public CompressionCodec {
+   public:
+    SharedBuffer encode(const SharedBuffer& raw);
+
+    bool decode(const SharedBuffer& encoded, uint32_t uncompressedSize, SharedBuffer& decoded);
+};
+}  // namespace pulsar
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index abeb2a6..dbf7d40 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -83,6 +83,11 @@
       <groupId>com.github.luben</groupId>
       <artifactId>zstd-jni</artifactId>
     </dependency>
+
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+    </dependency>
     
     <dependency>
       <groupId>org.bouncycastle</groupId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 625752b..d55f5f9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -14,12 +14,14 @@ public final class PulsarApi {
     LZ4(1, 1),
     ZLIB(2, 2),
     ZSTD(3, 3),
+    SNAPPY(4, 4),
     ;
     
     public static final int NONE_VALUE = 0;
     public static final int LZ4_VALUE = 1;
     public static final int ZLIB_VALUE = 2;
     public static final int ZSTD_VALUE = 3;
+    public static final int SNAPPY_VALUE = 4;
     
     
     public final int getNumber() { return value; }
@@ -30,6 +32,7 @@ public final class PulsarApi {
         case 1: return LZ4;
         case 2: return ZLIB;
         case 3: return ZSTD;
+        case 4: return SNAPPY;
         default: return null;
       }
     }
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
index bb8e312..f108f2d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
@@ -35,6 +35,7 @@ public class CompressionCodecProvider {
         codecs.put(PulsarApi.CompressionType.LZ4, new CompressionCodecLZ4());
         codecs.put(PulsarApi.CompressionType.ZLIB, new CompressionCodecZLib());
         codecs.put(PulsarApi.CompressionType.ZSTD, new CompressionCodecZstd());
+        codecs.put(PulsarApi.CompressionType.SNAPPY, new CompressionCodecSnappy());
     }
 
     public static CompressionCodec getCompressionCodec(PulsarApi.CompressionType type) {
@@ -55,6 +56,8 @@ public class CompressionCodecProvider {
             return PulsarApi.CompressionType.ZLIB;
         case ZSTD:
             return PulsarApi.CompressionType.ZSTD;
+        case SNAPPY:
+            return PulsarApi.CompressionType.SNAPPY;
 
         default:
             throw new RuntimeException("Invalid compression type");
@@ -71,6 +74,8 @@ public class CompressionCodecProvider {
             return CompressionType.ZLIB;
         case ZSTD:
             return CompressionType.ZSTD;
+        case SNAPPY:
+            return CompressionType.SNAPPY;
 
         default:
             throw new RuntimeException("Invalid compression type");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
new file mode 100644
index 0000000..1598d5e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Snappy Compression
+ */
+@Slf4j
+public class CompressionCodecSnappy implements CompressionCodec {
+
+    @Override
+    public ByteBuf encode(ByteBuf source) {
+        int uncompressedLength = source.readableBytes();
+        int maxLength = Snappy.maxCompressedLength(uncompressedLength);
+
+        ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
+
+        ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+        ByteBuffer targetNio = target.nioBuffer(0, maxLength);
+
+        int compressedLength = 0;
+        try {
+            compressedLength = Snappy.compress(sourceNio, targetNio);
+        } catch (IOException e) {
+            log.error("Failed to compress to Snappy: {}", e.getMessage());
+        }
+        target.writerIndex(compressedLength);
+        return target;
+    }
+
+    @Override
+    public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
+        ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
+        ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
+
+        ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
+        Snappy.uncompress(encodedNio, uncompressedNio);
+
+        uncompressed.writerIndex(uncompressedLength);
+        return uncompressed;
+    }
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 58238d7..3913b66 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -77,6 +77,7 @@ enum CompressionType {
 	LZ4    = 1;
 	ZLIB   = 2;
 	ZSTD   = 3;
+	SNAPPY   = 4;
 }
 
 message MessageMetadata {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 4dcc22d..b7e5c85 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -24,8 +24,6 @@ import static org.testng.Assert.assertTrue;
 import java.io.IOException;
 
 import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
 import org.testng.annotations.DataProvider;
 import org.testng.annotations.Test;
 
@@ -39,7 +37,7 @@ public class CompressorCodecTest {
 
     @DataProvider(name = "codec")
     public Object[][] codecProvider() {
-        return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }};
+        return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }, { CompressionType.SNAPPY }};
     }
 
     @Test(dataProvider = "codec")
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 5bda2e9..dcd0792 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -45,6 +45,7 @@ Messages published by producers can be compressed during transportation in order
 * [LZ4](https://github.com/lz4/lz4)
 * [ZLIB](https://zlib.net/)
 * [ZSTD](https://facebook.github.io/zstd/)
+* [SNAPPY](https://google.github.io/snappy/)
 
 ### Batching
 
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index c297087..7bad0cf 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -444,7 +444,7 @@ Options
 |`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
 |`--auth_plugin`|Authentication plugin class name||
 |`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
-|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB or ZSTD.||
+|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
 |`--conf-file`|Configuration file||
 |`-k`, `--encryption-key-name`|The public key name to encrypt payload||
 |`-v`, `--encryption-key-value-file`|The file which contains the public key to encrypt payload||