You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/21 00:00:11 UTC
[pulsar] branch master updated: [pulsar-common] Support Snappy
compression for Java (#4259)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 706a18e [pulsar-common] Support Snappy compression for Java (#4259)
706a18e is described below
commit 706a18e56b52468b1b48832115959983a3fffbe0
Author: Fangbin Sun <su...@gmail.com>
AuthorDate: Tue May 21 08:00:01 2019 +0800
[pulsar-common] Support Snappy compression for Java (#4259)
* Support Snappy compression for java.
* Some minor fix to pass unit tests
* Format the cpp code
* Added support for c++ client
* Format the cpp code
---
build/docker/Dockerfile | 2 +-
pom.xml | 7 ++
.../apache/pulsar/client/api/CompressionType.java | 3 +
.../apache/pulsar/client/api/ProducerBuilder.java | 2 +
pulsar-client-cpp/CMakeLists.txt | 14 ++++
pulsar-client-cpp/docker/Dockerfile | 8 +++
pulsar-client-cpp/include/pulsar/CompressionType.h | 1 +
.../include/pulsar/ProducerConfiguration.h | 2 +
pulsar-client-cpp/lib/CompressionCodec.cc | 8 +++
pulsar-client-cpp/lib/CompressionCodec.h | 2 +
pulsar-client-cpp/lib/CompressionCodecSnappy.cc | 79 ++++++++++++++++++++++
.../CompressionCodecSnappy.h} | 21 +++---
pulsar-common/pom.xml | 5 ++
.../apache/pulsar/common/api/proto/PulsarApi.java | 3 +
.../compression/CompressionCodecProvider.java | 5 ++
.../common/compression/CompressionCodecSnappy.java | 66 ++++++++++++++++++
pulsar-common/src/main/proto/PulsarApi.proto | 1 +
.../common/compression/CompressorCodecTest.java | 4 +-
site2/docs/concepts-messaging.md | 1 +
site2/docs/reference-cli-tools.md | 2 +-
20 files changed, 220 insertions(+), 16 deletions(-)
diff --git a/build/docker/Dockerfile b/build/docker/Dockerfile
index dd35794..bcb17c0 100644
--- a/build/docker/Dockerfile
+++ b/build/docker/Dockerfile
@@ -28,7 +28,7 @@ RUN apt-get update && \
liblog4cxx-dev libprotobuf-dev libboost-all-dev google-mock libgtest-dev \
libjsoncpp-dev libxml2-utils protobuf-compiler wget \
curl doxygen openjdk-8-jdk-headless clang-format-5.0 \
- gnupg2 golang-1.10-go zip unzip libzstd-dev
+ gnupg2 golang-1.10-go zip unzip libzstd-dev libsnappy-dev
# Compile and install gtest
RUN cd /usr/src/gtest && cmake . && make && cp libgtest.a /usr/lib
diff --git a/pom.xml b/pom.xml
index 5b28f3d..ca7f482 100644
--- a/pom.xml
+++ b/pom.xml
@@ -194,6 +194,7 @@ flexible messaging model and an intuitive client API.</description>
<jsonwebtoken.version>0.10.5</jsonwebtoken.version>
<opencensus.version>0.18.0</opencensus.version>
<zstd.version>1.3.7-3</zstd.version>
+ <snappy.version>1.1.1.3</snappy.version>
<hbase.version>1.4.9</hbase.version>
<!-- test dependencies -->
@@ -488,6 +489,12 @@ flexible messaging model and an intuitive client API.</description>
</dependency>
<dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy.version}</version>
+ </dependency>
+
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>${slf4j.version}</version>
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
index 49eb925..180fe62 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/CompressionType.java
@@ -33,4 +33,7 @@ public enum CompressionType {
/** Compress with Zstandard codec */
ZSTD,
+
+ /** Compress with Snappy codec */
+ SNAPPY
}
diff --git a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
index 2224e51..a98036c 100644
--- a/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
+++ b/pulsar-client-api/src/main/java/org/apache/pulsar/client/api/ProducerBuilder.java
@@ -260,6 +260,8 @@ public interface ProducerBuilder<T> extends Cloneable {
* <li>{@link CompressionType#ZLIB}: Standard ZLib compression</li>
* <li>{@link CompressionType#ZSTD} Compress with Zstandard codec. Since Pulsar 2.3. Zstd cannot be used if consumer
* applications are not in version >= 2.3 as well</li>
+ * <li>{@link CompressionType#SNAPPY} Compress with Snappy codec. Since Pulsar 2.4. Snappy cannot be used if consumer
+ * applications are not in version >= 2.4 as well</li>
* </ul>
*
* @param compressionType
diff --git a/pulsar-client-cpp/CMakeLists.txt b/pulsar-client-cpp/CMakeLists.txt
index 34c5809..57df5f5 100644
--- a/pulsar-client-cpp/CMakeLists.txt
+++ b/pulsar-client-cpp/CMakeLists.txt
@@ -120,6 +120,8 @@ endif()
find_library(LIB_ZSTD zstd libzstd)
+find_library(LIB_SNAPPY snappy libsnappy)
+
if (BUILD_PYTHON_WRAPPER)
find_package(PythonLibs REQUIRED)
MESSAGE(STATUS "PYTHON: " ${PYTHONLIBS_VERSION_STRING})
@@ -183,6 +185,12 @@ else ()
endif ()
MESSAGE(STATUS "HAS_ZSTD: ${HAS_ZSTD}")
+if (LIB_SNAPPY)
+ set(HAS_SNAPPY 1)
+else ()
+ set(HAS_SNAPPY 0)
+endif ()
+MESSAGE(STATUS "HAS_SNAPPY: ${HAS_SNAPPY}")
set(ADDITIONAL_LIBRARIES $ENV{PULSAR_ADDITIONAL_LIBRARIES})
link_directories( $ENV{PULSAR_ADDITIONAL_LIBRARY_PATH} )
@@ -241,6 +249,12 @@ endif ()
add_definitions(-DHAS_ZSTD=${HAS_ZSTD})
+if (HAS_SNAPPY)
+ set(COMMON_LIBS ${COMMON_LIBS} ${LIB_SNAPPY} )
+endif ()
+
+add_definitions(-DHAS_SNAPPY=${HAS_SNAPPY})
+
if(NOT APPLE AND NOT MSVC)
set(COMMON_LIBS ${COMMON_LIBS} rt)
endif ()
diff --git a/pulsar-client-cpp/docker/Dockerfile b/pulsar-client-cpp/docker/Dockerfile
index 965d8ff..043ebf0 100644
--- a/pulsar-client-cpp/docker/Dockerfile
+++ b/pulsar-client-cpp/docker/Dockerfile
@@ -149,6 +149,14 @@ RUN curl -O -L https://github.com/facebook/zstd/releases/download/v1.3.7/zstd-1.
make install && \
rm -rf /zstd-1.3.7 /zstd-1.3.7.tar.gz
+# Snappy
+RUN curl -O -L https://github.com/google/snappy/releases/download/1.1.3/snappy-1.1.3.tar.gz && \
+ tar xvfz snappy-1.1.3.tar.gz && \
+ cd snappy-1.1.3 && \
+ CFLAGS="-fPIC -O3" ./configure && \
+ make && make install && \
+ rm -rf /snappy-1.1.3 /snappy-1.1.3.tar.gz
+
RUN pip install twine
RUN pip install fastavro
RUN pip install six
diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/include/pulsar/CompressionType.h
index 2306b14..6fd663a 100644
--- a/pulsar-client-cpp/include/pulsar/CompressionType.h
+++ b/pulsar-client-cpp/include/pulsar/CompressionType.h
@@ -26,6 +26,7 @@ enum CompressionType
CompressionLZ4 = 1,
CompressionZLib = 2,
CompressionZSTD = 3,
+ CompressionSNAPPY = 4
};
}
diff --git a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
index 0708086..2de22d8 100644
--- a/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
+++ b/pulsar-client-cpp/include/pulsar/ProducerConfiguration.h
@@ -100,6 +100,8 @@ class PULSAR_PUBLIC ProducerConfiguration {
* <li>{@link CompressionZLib}: ZLib Compression http://zlib.net/</li>
* <li>{@link CompressionZSTD}: Zstandard Compression https://facebook.github.io/zstd/ (Since Pulsar 2.3.
* Zstd cannot be used if consumer applications are not in version >= 2.3 as well)</li>
+ * <li>{@link CompressionSNAPPY}: Snappy Compression https://google.github.io/snappy/ (Since Pulsar 2.4.
+ * Snappy cannot be used if consumer applications are not in version >= 2.4 as well)</li>
* </ul>
*/
ProducerConfiguration& setCompressionType(CompressionType compressionType);
diff --git a/pulsar-client-cpp/lib/CompressionCodec.cc b/pulsar-client-cpp/lib/CompressionCodec.cc
index 183439f..b092cf9 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.cc
+++ b/pulsar-client-cpp/lib/CompressionCodec.cc
@@ -20,6 +20,7 @@
#include "CompressionCodecLZ4.h"
#include "CompressionCodecZLib.h"
#include "CompressionCodecZstd.h"
+#include "CompressionCodecSnappy.h"
#include <cassert>
@@ -30,6 +31,7 @@ CompressionCodecNone CompressionCodecProvider::compressionCodecNone_;
CompressionCodecLZ4 CompressionCodecProvider::compressionCodecLZ4_;
CompressionCodecZLib CompressionCodecProvider::compressionCodecZLib_;
CompressionCodecZstd CompressionCodecProvider::compressionCodecZstd_;
+CompressionCodecSnappy CompressionCodecProvider::compressionCodecSnappy_;
CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compressionType) {
switch (compressionType) {
@@ -39,6 +41,8 @@ CompressionCodec& CompressionCodecProvider::getCodec(CompressionType compression
return compressionCodecZLib_;
case CompressionZSTD:
return compressionCodecZstd_;
+ case CompressionSNAPPY:
+ return compressionCodecSnappy_;
default:
return compressionCodecNone_;
}
@@ -54,6 +58,8 @@ CompressionType CompressionCodecProvider::convertType(proto::CompressionType typ
return CompressionZLib;
case proto::ZSTD:
return CompressionZSTD;
+ case proto::SNAPPY:
+ return CompressionSNAPPY;
}
}
@@ -67,6 +73,8 @@ proto::CompressionType CompressionCodecProvider::convertType(CompressionType typ
return proto::ZLIB;
case CompressionZSTD:
return proto::ZSTD;
+ case CompressionSNAPPY:
+ return proto::SNAPPY;
}
}
diff --git a/pulsar-client-cpp/lib/CompressionCodec.h b/pulsar-client-cpp/lib/CompressionCodec.h
index 00dd013..fd65f9c 100644
--- a/pulsar-client-cpp/lib/CompressionCodec.h
+++ b/pulsar-client-cpp/lib/CompressionCodec.h
@@ -35,6 +35,7 @@ class CompressionCodecNone;
class CompressionCodecLZ4;
class CompressionCodecZLib;
class CompressionCodecZstd;
+class CompressionCodecSnappy;
class PULSAR_PUBLIC CompressionCodecProvider {
public:
@@ -48,6 +49,7 @@ class PULSAR_PUBLIC CompressionCodecProvider {
static CompressionCodecLZ4 compressionCodecLZ4_;
static CompressionCodecZLib compressionCodecZLib_;
static CompressionCodecZstd compressionCodecZstd_;
+ static CompressionCodecSnappy compressionCodecSnappy_;
};
class PULSAR_PUBLIC CompressionCodec {
diff --git a/pulsar-client-cpp/lib/CompressionCodecSnappy.cc b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
new file mode 100644
index 0000000..f7a53ef
--- /dev/null
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.cc
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+#include "CompressionCodecSnappy.h"
+
+#if HAS_SNAPPY
+#include <snappy.h>
+#include "snappy-c.h"
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+ // Get the max size of the compressed data and allocate a buffer to hold it
+ int maxCompressedSize = snappy_max_compressed_length(raw.readableBytes());
+ SharedBuffer compressed = SharedBuffer::allocate(maxCompressedSize);
+
+ unsigned long bytesWritten = maxCompressedSize;
+
+ snappy_status status =
+ snappy_compress(raw.data(), raw.readableBytes(), compressed.mutableData(), &bytesWritten);
+
+ if (status != SNAPPY_OK) {
+ LOG_ERROR("Failed to compress to snappy. res=" << res);
+ abort();
+ }
+
+ compressed.bytesWritten(bytesWritten);
+
+ return compressed;
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+ SharedBuffer& decoded) {
+ SharedBuffer decompressed = SharedBuffer::allocate(uncompressedSize);
+
+ snappy_status status = snappy_uncompress(encoded.data(), encoded.readableBytes(),
+ decompressed.mutableData(), uncompressedSize);
+
+ if (status == SNAPPY_OK) {
+ decoded = decompressed;
+ decoded.setWriterIndex(uncompressedSize);
+ return true;
+ } else {
+ // Decompression failed
+ return false;
+ }
+}
+} // namespace pulsar
+
+#else // No SNAPPY
+
+namespace pulsar {
+
+SharedBuffer CompressionCodecSnappy::encode(const SharedBuffer& raw) {
+ throw "Snappy compression not supported";
+}
+
+bool CompressionCodecSnappy::decode(const SharedBuffer& encoded, uint32_t uncompressedSize,
+ SharedBuffer& decoded) {
+ throw "Snappy compression not supported";
+}
+} // namespace pulsar
+
+#endif // HAS_SNAPPY
diff --git a/pulsar-client-cpp/include/pulsar/CompressionType.h b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
similarity index 74%
copy from pulsar-client-cpp/include/pulsar/CompressionType.h
copy to pulsar-client-cpp/lib/CompressionCodecSnappy.h
index 2306b14..4738376 100644
--- a/pulsar-client-cpp/include/pulsar/CompressionType.h
+++ b/pulsar-client-cpp/lib/CompressionCodecSnappy.h
@@ -16,17 +16,16 @@
* specific language governing permissions and limitations
* under the License.
*/
-#ifndef PULSAR_COMPRESSIONTYPE_H_
-#define PULSAR_COMPRESSIONTYPE_H_
+#pragma once
+
+#include "CompressionCodec.h"
namespace pulsar {
-enum CompressionType
-{
- CompressionNone = 0,
- CompressionLZ4 = 1,
- CompressionZLib = 2,
- CompressionZSTD = 3,
-};
-}
-#endif /* PULSAR_COMPRESSIONTYPE_H_ */
+class CompressionCodecSnappy : public CompressionCodec {
+ public:
+ SharedBuffer encode(const SharedBuffer& raw);
+
+ bool decode(const SharedBuffer& encoded, uint32_t uncompressedSize, SharedBuffer& decoded);
+};
+} // namespace pulsar
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index abeb2a6..dbf7d40 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -83,6 +83,11 @@
<groupId>com.github.luben</groupId>
<artifactId>zstd-jni</artifactId>
</dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ </dependency>
<dependency>
<groupId>org.bouncycastle</groupId>
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
index 625752b..d55f5f9 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java
@@ -14,12 +14,14 @@ public final class PulsarApi {
LZ4(1, 1),
ZLIB(2, 2),
ZSTD(3, 3),
+ SNAPPY(4, 4),
;
public static final int NONE_VALUE = 0;
public static final int LZ4_VALUE = 1;
public static final int ZLIB_VALUE = 2;
public static final int ZSTD_VALUE = 3;
+ public static final int SNAPPY_VALUE = 4;
public final int getNumber() { return value; }
@@ -30,6 +32,7 @@ public final class PulsarApi {
case 1: return LZ4;
case 2: return ZLIB;
case 3: return ZSTD;
+ case 4: return SNAPPY;
default: return null;
}
}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
index bb8e312..f108f2d 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecProvider.java
@@ -35,6 +35,7 @@ public class CompressionCodecProvider {
codecs.put(PulsarApi.CompressionType.LZ4, new CompressionCodecLZ4());
codecs.put(PulsarApi.CompressionType.ZLIB, new CompressionCodecZLib());
codecs.put(PulsarApi.CompressionType.ZSTD, new CompressionCodecZstd());
+ codecs.put(PulsarApi.CompressionType.SNAPPY, new CompressionCodecSnappy());
}
public static CompressionCodec getCompressionCodec(PulsarApi.CompressionType type) {
@@ -55,6 +56,8 @@ public class CompressionCodecProvider {
return PulsarApi.CompressionType.ZLIB;
case ZSTD:
return PulsarApi.CompressionType.ZSTD;
+ case SNAPPY:
+ return PulsarApi.CompressionType.SNAPPY;
default:
throw new RuntimeException("Invalid compression type");
@@ -71,6 +74,8 @@ public class CompressionCodecProvider {
return CompressionType.ZLIB;
case ZSTD:
return CompressionType.ZSTD;
+ case SNAPPY:
+ return CompressionType.SNAPPY;
default:
throw new RuntimeException("Invalid compression type");
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
new file mode 100644
index 0000000..1598d5e
--- /dev/null
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/compression/CompressionCodecSnappy.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.common.compression;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.buffer.PooledByteBufAllocator;
+import lombok.extern.slf4j.Slf4j;
+import org.xerial.snappy.Snappy;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Snappy Compression
+ */
+@Slf4j
+public class CompressionCodecSnappy implements CompressionCodec {
+
+ @Override
+ public ByteBuf encode(ByteBuf source) {
+ int uncompressedLength = source.readableBytes();
+ int maxLength = Snappy.maxCompressedLength(uncompressedLength);
+
+ ByteBuffer sourceNio = source.nioBuffer(source.readerIndex(), source.readableBytes());
+
+ ByteBuf target = PooledByteBufAllocator.DEFAULT.buffer(maxLength, maxLength);
+ ByteBuffer targetNio = target.nioBuffer(0, maxLength);
+
+ int compressedLength = 0;
+ try {
+ compressedLength = Snappy.compress(sourceNio, targetNio);
+ } catch (IOException e) {
+ log.error("Failed to compress to Snappy: {}", e.getMessage());
+ }
+ target.writerIndex(compressedLength);
+ return target;
+ }
+
+ @Override
+ public ByteBuf decode(ByteBuf encoded, int uncompressedLength) throws IOException {
+ ByteBuf uncompressed = PooledByteBufAllocator.DEFAULT.buffer(uncompressedLength, uncompressedLength);
+ ByteBuffer uncompressedNio = uncompressed.nioBuffer(0, uncompressedLength);
+
+ ByteBuffer encodedNio = encoded.nioBuffer(encoded.readerIndex(), encoded.readableBytes());
+ Snappy.uncompress(encodedNio, uncompressedNio);
+
+ uncompressed.writerIndex(uncompressedLength);
+ return uncompressed;
+ }
+}
diff --git a/pulsar-common/src/main/proto/PulsarApi.proto b/pulsar-common/src/main/proto/PulsarApi.proto
index 58238d7..3913b66 100644
--- a/pulsar-common/src/main/proto/PulsarApi.proto
+++ b/pulsar-common/src/main/proto/PulsarApi.proto
@@ -77,6 +77,7 @@ enum CompressionType {
LZ4 = 1;
ZLIB = 2;
ZSTD = 3;
+ SNAPPY = 4;
}
message MessageMetadata {
diff --git a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
index 4dcc22d..b7e5c85 100644
--- a/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
+++ b/pulsar-common/src/test/java/org/apache/pulsar/common/compression/CompressorCodecTest.java
@@ -24,8 +24,6 @@ import static org.testng.Assert.assertTrue;
import java.io.IOException;
import org.apache.pulsar.common.api.proto.PulsarApi.CompressionType;
-import org.apache.pulsar.common.compression.CompressionCodec;
-import org.apache.pulsar.common.compression.CompressionCodecProvider;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;
@@ -39,7 +37,7 @@ public class CompressorCodecTest {
@DataProvider(name = "codec")
public Object[][] codecProvider() {
- return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }};
+ return new Object[][] { { CompressionType.NONE }, { CompressionType.LZ4 }, { CompressionType.ZLIB }, { CompressionType.ZSTD }, { CompressionType.SNAPPY }};
}
@Test(dataProvider = "codec")
diff --git a/site2/docs/concepts-messaging.md b/site2/docs/concepts-messaging.md
index 5bda2e9..dcd0792 100644
--- a/site2/docs/concepts-messaging.md
+++ b/site2/docs/concepts-messaging.md
@@ -45,6 +45,7 @@ Messages published by producers can be compressed during transportation in order
* [LZ4](https://github.com/lz4/lz4)
* [ZLIB](https://zlib.net/)
* [ZSTD](https://facebook.github.io/zstd/)
+* [SNAPPY](https://google.github.io/snappy/)
### Batching
diff --git a/site2/docs/reference-cli-tools.md b/site2/docs/reference-cli-tools.md
index c297087..7bad0cf 100644
--- a/site2/docs/reference-cli-tools.md
+++ b/site2/docs/reference-cli-tools.md
@@ -444,7 +444,7 @@ Options
|`--auth_params`|Authentication parameters, whose format is determined by the implementation of method `configure` in authentication plugin class, for example "key1:val1,key2:val2" or "{"key1":"val1","key2":"val2"}.||
|`--auth_plugin`|Authentication plugin class name||
|`-b`, `--batch-time-window`|Batch messages in a window of the specified number of milliseconds|1|
-|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB or ZSTD.||
+|`-z`, `--compression`|Compress messages’ payload. Possible values are NONE, LZ4, ZLIB, ZSTD or SNAPPY.||
|`--conf-file`|Configuration file||
|`-k`, `--encryption-key-name`|The public key name to encrypt payload||
|`-v`, `--encryption-key-value-file`|The file which contains the public key to encrypt payload||