You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@uniffle.apache.org by ro...@apache.org on 2022/11/06 07:33:56 UTC

[incubator-uniffle] branch master updated: [ISSUE-283][FEATURE] Support snappy compression/decompression (#304)

This is an automated email from the ASF dual-hosted git repository.

roryqi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-uniffle.git


The following commit(s) were added to refs/heads/master by this push:
     new fbe30748 [ISSUE-283][FEATURE] Support snappy compression/decompression (#304)
fbe30748 is described below

commit fbe307481f57cdc97816a207c2640281612d95c6
Author: Rui Wang <ru...@databricks.com>
AuthorDate: Sun Nov 6 00:33:52 2022 -0700

    [ISSUE-283][FEATURE] Support snappy compression/decompression (#304)
    
    ### What changes were proposed in this pull request?
    This PR adds the support of snappy compression/decompression based on the example of https://github.com/apache/incubator-uniffle/pull/254.
    
    ### Why are the changes needed?
    Add a new feature.
    
    ### Does this PR introduce _any_ user-facing change?
    No
    
    ### How was this patch tested?
    UT
---
 common/pom.xml                                     |  5 ++
 .../apache/uniffle/common/compression/Codec.java   | 13 +++++
 .../uniffle/common/compression/SnappyCodec.java    | 67 ++++++++++++++++++++++
 .../uniffle/common/config/RssClientConf.java       |  2 +-
 .../common/compression/CompressionTest.java        |  2 +-
 docs/client_guide.md                               |  2 +-
 pom.xml                                            |  7 +++
 7 files changed, 95 insertions(+), 3 deletions(-)

diff --git a/common/pom.xml b/common/pom.xml
index f043eb9c..aec42750 100644
--- a/common/pom.xml
+++ b/common/pom.xml
@@ -99,6 +99,11 @@
       <artifactId>zstd-jni</artifactId>
       <scope>provided</scope>
     </dependency>
+    <dependency>
+      <groupId>org.xerial.snappy</groupId>
+      <artifactId>snappy-java</artifactId>
+      <scope>provided</scope>
+    </dependency>
   </dependencies>
 
   <build>
diff --git a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
index 9ff7d85d..f5ebc02a 100644
--- a/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
+++ b/common/src/main/java/org/apache/uniffle/common/compression/Codec.java
@@ -31,6 +31,8 @@ public abstract class Codec {
     switch (type) {
       case ZSTD:
         return new ZstdCodec(rssConf.get(ZSTD_COMPRESSION_LEVEL));
+      case SNAPPY:
+        return new SnappyCodec();
       case NOOP:
         return new NoOpCodec();
       case LZ4:
@@ -39,13 +41,24 @@ public abstract class Codec {
     }
   }
 
+  /**
+   *
+   * @param src
+   * @param uncompressedLen
+   * @param dest
+   * @param destOffset
+   */
   public abstract void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int destOffset);
 
+  /**
+   *  Compress bytes into a byte array.
+   */
   public abstract byte[] compress(byte[] src);
 
   public enum Type {
     LZ4,
     ZSTD,
     NOOP,
+    SNAPPY,
   }
 }
diff --git a/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
new file mode 100644
index 00000000..41e4a8a5
--- /dev/null
+++ b/common/src/main/java/org/apache/uniffle/common/compression/SnappyCodec.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.xerial.snappy.Snappy;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class SnappyCodec extends Codec {
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dest, int destOffset) {
+    try {
+      if (!src.isDirect() && !dest.isDirect()) {
+        int size = Snappy.uncompress(src.array(), src.position(), src.limit() - src.position(), dest.array(),
+            destOffset);
+        if (size != uncompressedLen) {
+          throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+        }
+        return;
+      }
+      if (src.isDirect() && dest.isDirect()) {
+        if (destOffset != 0) {
+          throw new RssException(
+            "Snappy decompression does not support non-zero offset for destination direct ByteBuffer");
+        }
+        int size = Snappy.uncompress(src, dest);
+        if (size != uncompressedLen) {
+          throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+        }
+        return;
+      }
+    } catch (IOException e) {
+      throw new RssException("Failed to uncompress by Snappy", e);
+    }
+
+    throw new IllegalStateException("Snappy only supports the same type of bytebuffer decompression.");
+  }
+
+  @Override
+  public byte[] compress(byte[] src) {
+    try {
+      return Snappy.compress(src);
+    } catch (IOException e) {
+      throw new RssException("Failed to uncompress by Snappy", e);
+    }
+  }
+}
diff --git a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
index cb6ec2e8..119ab4e6 100644
--- a/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
+++ b/common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java
@@ -29,7 +29,7 @@ public class RssClientConf {
       .enumType(Codec.Type.class)
       .defaultValue(LZ4)
       .withDescription("The compression codec is used to compress the shuffle data. "
-          + "Default codec is `LZ4`, `ZSTD` also can be used.");
+          + "Default codec is `LZ4`. Other options are`ZSTD` and `SNAPPY`.");
 
   public static final ConfigOption<Integer> ZSTD_COMPRESSION_LEVEL = ConfigOptions
       .key("rss.client.io.compression.zstd.level")
diff --git a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
index cb2fdc6f..6c3e8c32 100644
--- a/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
+++ b/common/src/test/java/org/apache/uniffle/common/compression/CompressionTest.java
@@ -35,7 +35,7 @@ public class CompressionTest {
 
   static List<Arguments> testCompression() {
     int[] sizes = {1, 1024, 128 * 1024, 512 * 1024, 1024 * 1024, 4 * 1024 * 1024};
-    Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4};
+    Codec.Type[] types = {Codec.Type.ZSTD, Codec.Type.LZ4, Codec.Type.SNAPPY};
 
     List<Arguments> arguments = new ArrayList<>();
     for (int size : sizes) {
diff --git a/docs/client_guide.md b/docs/client_guide.md
index 0699ce55..61cd6852 100644
--- a/docs/client_guide.md
+++ b/docs/client_guide.md
@@ -100,7 +100,7 @@ These configurations are shared by all types of clients.
 |<client_type>.rss.client.assignment.tags|-|The comma-separated list of tags for deciding assignment shuffle servers. Notice that the SHUFFLE_SERVER_VERSION will always as the assignment tag whether this conf is set or not|
 |<client_type>.rss.client.data.commit.pool.size|The number of assigned shuffle servers|The thread size for sending commit to shuffle servers|
 |<client_type>.rss.client.assignment.shuffle.nodes.max|-1|The number of required assignment shuffle servers. If it is less than 0 or equals to 0 or greater than the coordinator's config of "rss.coordinator.shuffle.nodes.max", it will use the size of "rss.coordinator.shuffle.nodes.max" default|
-|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`, `zstd` also can be used.|
+|<client_type>.rss.client.io.compression.codec|lz4|The compression codec is used to compress the shuffle data. Default codec is `lz4`. Other options are`ZSTD` and `SNAPPY`.|
 |<client_type>.rss.client.io.compression.zstd.level|3|The zstd compression level, the default level is 3|
 |<client_type>.rss.client.shuffle.data.distribution.type|NORMAL|The type of partition shuffle data distribution, including normal and local_order. The default value is normal. Now this config is only valid in Spark3.x|
 Notice:
diff --git a/pom.xml b/pom.xml
index 61e999c2..dc4f34a6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -76,6 +76,7 @@
     <spotbugs-maven-plugin.version>4.7.0.0</spotbugs-maven-plugin.version>
     <system-rules.version>1.19.0</system-rules.version>
     <zstd-jni.version>1.5.2-3</zstd-jni.version>
+    <snappy-java.version>1.1.8.4</snappy-java.version>
     <test.redirectToFile>true</test.redirectToFile>
     <trimStackTrace>false</trimStackTrace>
   </properties>
@@ -607,6 +608,12 @@
         <artifactId>zstd-jni</artifactId>
         <version>${zstd-jni.version}</version>
       </dependency>
+
+      <dependency>
+        <groupId>org.xerial.snappy</groupId>
+        <artifactId>snappy-java</artifactId>
+        <version>${snappy-java.version}</version>
+      </dependency>
     </dependencies>
   </dependencyManagement>