You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/06 07:33:56 UTC
[incubator-uniffle] branch master updated: [ISSUE-283][FEATURE] Support snappy compression/decompression (#304)
This is an automated email from the ASF dual-hosted git repository.
roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git
The following commit(s) were added to refs/heads/master by this push:
new fbe30748 [ISSUE-283][FEATURE] Support snappy compression/decompression (#304)
fbe30748 is described below
commit fbe307481f57cdc97816a207c2640281612d95c6
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Sun Nov 6 00:33:52 2022 -0700
[ISSUE-283][FEATURE] Support snappy compression/decompression (#304)
### What changes were proposed in this pull request?
This PR adds the support of snappy compression/decompression based on the example of https://github.com/apache/incubator-uniffle/pull/254.
### Why are the changes needed?
Add a new feature.
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
UT
---
common/pom.xml | 5 ++
.../apache/uniffle/common/compression/Codec.java | 13 +++++
.../uniffle/common/compression/SnappyCodec.java | 67 ++++++++++++++++++++++
.../uniffle/common/config/RssClientConf.java | 2 +-
.../common/compression/CompressionTest.java | 2 +-
docs/client_guide.md | 2 +-
pom.xml | 7 +++
7 files changed, 95 insertions(+), 3 deletions(-)
diff --git a/common/pom.xml b/common/pom.xml
index f043eb9c..aec42750 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -99,6 +99,11 @@
<artifactId>zstd-jni</artifactId>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
diff --git a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 9ff7d85d..f5ebc02a 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -31,6 +31,8 @@ public abstract class Codec {
switch (type) {
case ZSTD:
return new ZstdCodec(rssConf.get(ZSTD_COMPRESSION_LEVEL));
+ case SNAPPY:
+ return new SnappyCodec();
case NOOP:
return new NoOpCodec();
case LZ4:
@@ -39,13 +41,24 @@ public abstract class Codec {
}
}
+ /**
+ *
+ * @param src
+ * @param uncompressedLen
+ * @param dest
+ * @param destOffset
+ */
public abstract void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int destOffset);
+ /**
+ * Compress bytes into a byte array.
+ */
public abstract byte[] compress(byte[] src);
public enum Type {
LZ4,
ZSTD,
NOOP,
+ SNAPPY,
}
}
diff --git a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
new file mode 100644
index 00000000..41e4a8a5
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.xerial.snappy.Snappy;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class SnappyCodec extends Codec {
+ @Override
+ public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int destOffset) {
+ try {
+ if (!src.isDirect() && !dest.isDirect()) {
+ int size = Snappy.uncompress(src.array(), src.position(), src.limit() - src.position(), dest.array(),
+ destOffset);
+ if (size != uncompressedLen) {
+ throw new RssException(
+ "This should not happen that the decompressed data size is not equals to original size.");
+ }
+ return;
+ }
+ if (src.isDirect() && dest.isDirect()) {
+ if (destOffset != 0) {
+ throw new RssException(
+ "Snappy decompression does not support non-zero offset for destination direct ByteBuffer");
+ }
+ int size = Snappy.uncompress(src, dest);
+ if (size != uncompressedLen) {
+ throw new RssException(
+ "This should not happen that the decompressed data size is not equals to original size.");
+ }
+ return;
+ }
+ } catch (IOException e) {
+ throw new RssException("Failed to uncompress by Snappy", e);
+ }
+
+ throw new IllegalStateException("Snappy only supports the same type of bytebuffer decompression.");
+ }
+
+ @Override
+ public byte[] compress(byte[] src) {
+ try {
+ return Snappy.compress(src);
+ } catch (IOException e) {
+ throw new RssException("Failed to uncompress by Snappy", e);
+ }
+ }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index cb6ec2e8..119ab4e6 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -29,7 +29,7 @@ public class RssClientConf {
.enumType(Codec.Type.class)
.defaultValue(LZ4)
.withDescription("The compression codec is used to compress the shuffle data. "
- + "Default codec is `LZ4`, `ZSTD` also can be used.");
+ + "Default codec is `LZ4`. Other options are`ZSTD` and `SNAPPY`.");
public static final ConfigOption<Integer> ZSTD_COMPRESSION_LEVEL = ConfigOptions
.key("rss.client.io.compression.zstd.level")
diff --git a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index cb2fdc6f..6c3e8c32 100644
--- a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -35,7 +35,7 @@ public class CompressionTest {
static List<Arguments> testCompression() {
int[] sizes = {1, 1024, 128 * 1024, 512 * 1024, 1024 * 1024, 4 * 1024 * 1024};
- Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4};
+ Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4, Codec.Type.SNAPPY};
List<Arguments> arguments = new ArrayList<>();
for (int size : sizes) {
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 0699ce55..61cd6852 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -100,7 +100,7 @@ These configurations are shared by all types of clients.
|<client_type>.rss.client.assignment.tags|-|The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not|
|<client_type>.rss.client.data.commit.pool.size|The number of assigned shuffle servers|The thread size for sending commit to shuffle servers|
|<client_type>.rss.client.assignment.shuffle.nodes.max|-1|The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default|
-|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`, `zstd` also can be used.|
+|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.|
|<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
|<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
Notice:
diff --git a/pom.xml b/pom.xml
index 61e999c2..dc4f34a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
<spotbugs-maven-plugin.version>4.7.0.0</spotbugs-maven-plugin.version>
<system-rules.version>1.19.0</system-rules.version>
<zstd-jni.version>1.5.2-3</zstd-jni.version>
+ <snappy-java.version>1.1.8.4</snappy-java.version>
<test.redirectToFile>true</test.redirectToFile>
<trimStackTrace>false</trimStackTrace>
</properties>
@@ -607,6 +608,12 @@
<artifactId>zstd-jni</artifactId>
<version>${zstd-jni.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.xerial.snappy</groupId>
+ <artifactId>snappy-java</artifactId>
+ <version>${snappy-java.version}</version>
+ </dependency>
</dependencies>
</dependencyManagement>