You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@uniffle.apache.org by GitBox <gi...@apache.org> on 2022/10/08 07:43:53 UTC

[GitHub] [incubator-uniffle] zuston opened a new pull request, #254: Support ZSTD

zuston opened a new pull request, #254:
URL: https://github.com/apache/incubator-uniffle/pull/254

   <!--
   Thanks for sending a pull request!  Here are some tips for you:
     1. If this is your first time, please read our contributor guidelines: https://github.com/Tencent/Firestorm/blob/master/CONTRIBUTING.md
     2. Ensure you have added or run the appropriate tests for your PR
     3. If the PR is unfinished, add '[WIP]' in your PR title, e.g., '[WIP]XXXX Your PR title ...'.
     4. Be sure to keep the PR description updated to reflect all changes.
     5. Please write your PR title to summarize what this PR proposes.
     6. If possible, provide a concise example to reproduce the issue for a faster review.
   -->
   
   ### What changes were proposed in this pull request?
   1. Introduce the ZSTD compression
   2. Introduce the abstract interface of codec
   3. Recycle the buffer to optimize the performance
   <!--
   Please clarify what changes you are proposing. The purpose of this section is to outline the changes and how this PR fixes the issue.
   If possible, please consider writing useful notes for better and faster reviews in your PR. See the examples below.
     1. If you refactor some codes with changing classes, showing the class hierarchy will help reviewers.
     4. If you fix some SQL features, you can provide some references of other DBMSes.
     5. If there is design documentation, please add the link.
     6. If there is a discussion in the mailing list, please add the link.
   -->
   
   
   ### Why are the changes needed?
   ZSTD has a good tradeoff between compression ratio and de/compress speed. For reducing the shuffle-data stored size, it's necessary to support this compression algorithm.
   
   
   
   <!--
   Please clarify why the changes are needed. For instance,
     1. If you propose a new API, clarify the use case for a new API.
     2. If you fix a bug, you can clarify why it is a bug.
   -->
   
   
   ### Does this PR introduce _any_ user-facing change?
   Yes
   <!--
   Note that it means *any* user-facing change including all aspects such as the documentation fix.
   If yes, please clarify the previous behavior and the change this PR proposes - provide the console output, description and/or an example to show the behavior difference if possible.
   If possible, please also clarify if this is a user-facing change compared to the released versions or within the unreleased branches such as master.
   If no, write 'No'.
   -->
   
   
   ### How was this patch tested?
   <!--
   If tests were added, say they were added here. Please make sure to add some test cases that check the changes thoroughly including negative and positive cases if possible.
   If it was tested in a way different from regular unit tests, please clarify how you tested step by step, ideally copy and paste-able, so that other reviewers can test and check, and descendants can verify in the future.
   If tests were not added, please describe why they were not added and/or why it was difficult to add.
   -->
   Manual tests and UTs


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990892535


##########
common/src/main/java/org/apache/uniffle/common/compression/Lz4Compressor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+
+public class Lz4Compressor implements Compressor {
+
+  private LZ4Compressor compressor;
+
+  public Lz4Compressor() {
+    this.compressor = LZ4Factory.fastestInstance().fastCompressor();

Review Comment:
   Updated.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r991006602


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java:
##########
@@ -278,4 +281,17 @@ public Double apply(String in) {
   public static TypedConfigBuilder<String> createStringBuilder(ConfigBuilder builder) {
     return builder.stringConf();
   }
+
+  public static RssConf toRssConf(SparkConf sparkConf) {

Review Comment:
   Why change conf design in this ZSTD PR?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990888882


##########
common/src/main/java/org/apache/uniffle/common/compression/Lz4Compressor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+
+public class Lz4Compressor implements Compressor {
+
+  private LZ4Compressor compressor;
+
+  public Lz4Compressor() {
+    this.compressor = LZ4Factory.fastestInstance().fastCompressor();

Review Comment:
   Nice catch. 
   
   After reviewing the Spark related code, I think we should make `LZ4Factory.fastestInstance()` shareable.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1284813410

   Gentle ping @frankliee @jerqi .


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994529827


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   Do we need to merge Compressor and Decompressor into one interface "Codec" like hadoop ?
   It is more concise and avoid to mix different pair of Compressor and Decompressor.
   
   @jerqi @zuston 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1286851866

   > Bug: I found the zstd has no such method of `decompressByteArray` in Spark2.4.6 zstd version of 1.3.2-2
   > 
   > To be compatible with the older version, I think I should use the reflection to check it.
   
   This problem will be fixed in the next PR. We could merge this firstly. 
   
   I have updated latest commit, could u help review @frankliee @jerqi . If having any problem, I think I could do quick fix in this weekend.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990635347


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   Should we avoid static import here?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990720327


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   We should avoid `static import` here first. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005138924


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java:
##########
@@ -53,4 +60,35 @@ public static void setupServers() throws Exception {
   @Override
   public void updateRssStorage(SparkConf sparkConf) {
   }
+
+  /**
+   * Test different compression types with localfile rss mode.
+   * @throws Exception
+   */
+  @Override
+  public void run() throws Exception {
+    String fileName = generateTestFile();
+    SparkConf sparkConf = createSparkConf();
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+    List<Map> results = new ArrayList<>();
+    Map resultWithoutRss = runSparkApp(sparkConf, fileName);
+    results.add(resultWithoutRss);
+
+    updateSparkConfWithRss(sparkConf);
+    updateSparkConfCustomer(sparkConf);
+    for (Codec.Type type :
+        new Codec.Type[]{
+            Codec.Type.NOOP,
+            Codec.Type.ZSTD,
+            Codec.Type.LZ4}) {
+      sparkConf.set("spark." + COMPRESSION_TYPE.key(), type.name());

Review Comment:
   It may be better to add  lowercase tests.



##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.Codec;
+
+import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
+
+public class RssClientConf {
+
+  public static final ConfigOption<Codec.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(Codec.Type.class)
+      .defaultValue(LZ4)

Review Comment:
   It is better to use lowercase as the default string like `spark.io.compression.codec.`
   And this value should be case insensitive.



##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+
+import com.github.luben.zstd.Zstd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class ZstdCodec extends Codec {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZstdCodec.class);
+
+  private final int compressionLevel;
+
+  public ZstdCodec(int level) {
+    this.compressionLevel = level;
+    LOGGER.info("Initializing zstd compressor.");
+  }
+
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int dstOffset) {
+    if (src.isDirect() && dst.isDirect()) {
+      long size = Zstd.decompressDirectByteBuffer(
+          dst, dstOffset, uncompressedLen,
+          src, src.position(), src.limit() - src.position()
+      );
+      if (size != uncompressedLen) {
+        throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+      }
+      return;
+    }
+
+    if (!src.isDirect() && !dst.isDirect()) {
+      Zstd.decompressByteArray(
+          dst.array(), dstOffset, uncompressedLen,
+          src.array(), src.position(), src.limit() - src.position()
+      );
+      return;
+    }
+
+    throw new RssException("Zstd only supports the same type of bytebuffer decompression.");

Review Comment:
   It may be better to use IllegalStateException when one is direct but another is not direct.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r998036423


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   I propose the new commit according to your idea, https://github.com/apache/incubator-uniffle/pull/254/commits/866b642342b5854f266e6a03d07c673dc61a2f90
   
   Do I get your point? @frankliee 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1000420726


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   Emm.... OK. I will obey this project style.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1283386353

   PTAL @frankliee . I have updated the latest commit according to your advice. If this is OK, I will go ahead.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1000311449


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   I prefer this style 
   ```
   abstract class Codec {
      private static class Compressor {}
      private static class deCompressor{}
      private getCompressor()  // for init lazily
      private getDeCompressor()
    
      public compress() 
      public deCompress()
   }
   
   ZSTDCodec extends Codec {}
   LZ4Codec extends Codec {}
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990719299


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory of() {

Review Comment:
   Emm. I refer to the implementation of flink, like this https://github.com/apache/flink/blob/7336672fa1e7b322122ff7ddc1e01227b716c070/flink-table/flink-table-common/src/main/java/org/apache/flink/table/catalog/CatalogTable.java#L51



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990721194


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -109,24 +123,20 @@ public boolean hasNext() {
       shuffleReadMetrics.incFetchWaitTime(fetchDuration);
       if (compressedData != null) {
         shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
-        // Directbytebuffers are not collected in time will cause executor easy 
-        // be killed by cluster managers(such as YARN) for using too much offheap memory
-        if (uncompressedData != null && uncompressedData.isDirect()) {
-          try {
-            RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
-          } catch (Exception e) {
-            throw new RssException("Destroy DirectByteBuffer failed!", e);
-          }
+
+        int uncompressedLen = compressedBlock.getUncompressLength();
+        if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
+          uncompressedData = ByteBuffer.allocate(uncompressedLen);

Review Comment:
   In original implementation, the bytebuffer will be destoryed and recreate. So to avoid the frequent GC, it use the offheap-bytebuffer. 
   
   And in this PR, we will recycle the bytebuffer, so I think it's no need to use the off-heap memory now. Maybe we should add the off-heap support in the next PR.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990725476


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r996888602


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   So you mean that I need to create similar Zstd/LZ4CompressionCodec to implement the Compressor and Decompressor interface? 
   
   If that, it will make hard to init the corresponding var for specific compressor or decompressor, like the `this.lz4Factory = LZ4Factory.fastestInstance();`.
   
   Please let me know if i'm wrong



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990999460


##########
common/src/main/java/org/apache/uniffle/common/compression/NoOpCompressor.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+/**
+ * The class is to be as the basic compressor for test cases
+ */
+public class NoOpCompressor implements Compressor {
+
+  @Override
+  public byte[] compress(byte[] data) {

Review Comment:
   data -> src ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990883961


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   > I think I missunderstand you meaning. So we just use `RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValueString()` to avoid static import, right?
   
   Yes



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272872035

   Can you provide the test report of JVM memory usage?
   If the new compressor uses memory memory, it will increase the risk of OOM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990721383


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -109,24 +123,20 @@ public boolean hasNext() {
       shuffleReadMetrics.incFetchWaitTime(fetchDuration);
       if (compressedData != null) {
         shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
-        // Directbytebuffers are not collected in time will cause executor easy 
-        // be killed by cluster managers(such as YARN) for using too much offheap memory
-        if (uncompressedData != null && uncompressedData.isDirect()) {
-          try {
-            RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
-          } catch (Exception e) {
-            throw new RssException("Destroy DirectByteBuffer failed!", e);
-          }
+
+        int uncompressedLen = compressedBlock.getUncompressLength();
+        if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
+          uncompressedData = ByteBuffer.allocate(uncompressedLen);

Review Comment:
   PTAL @jerqi . This is the different with the original implementation.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1288584692

   Gentle ping @jerqi @frankliee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005178664


##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+
+import com.github.luben.zstd.Zstd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class ZstdCodec extends Codec {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZstdCodec.class);
+
+  private final int compressionLevel;
+
+  public ZstdCodec(int level) {
+    this.compressionLevel = level;
+    LOGGER.info("Initializing zstd compressor.");
+  }
+
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int dstOffset) {
+    if (src.isDirect() && dst.isDirect()) {
+      long size = Zstd.decompressDirectByteBuffer(
+          dst, dstOffset, uncompressedLen,
+          src, src.position(), src.limit() - src.position()
+      );
+      if (size != uncompressedLen) {
+        throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+      }
+      return;
+    }
+
+    if (!src.isDirect() && !dst.isDirect()) {
+      Zstd.decompressByteArray(
+          dst.array(), dstOffset, uncompressedLen,
+          src.array(), src.position(), src.limit() - src.position()
+      );
+      return;
+    }
+
+    throw new RssException("Zstd only supports the same type of bytebuffer decompression.");

Review Comment:
   I hope it could be as RssException. Because we will catch all RssException in spark client side in our internal version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r991010587


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/RssSparkConfig.java:
##########
@@ -278,4 +281,17 @@ public Double apply(String in) {
   public static TypedConfigBuilder<String> createStringBuilder(ConfigBuilder builder) {
     return builder.stringConf();
   }
+
+  public static RssConf toRssConf(SparkConf sparkConf) {

Review Comment:
   I want to make compressorFactory accessed by MR and Spark to create concrete codec which will be initialized by specified conf, so it will have two choice.
   1. Use the shareable RssConf like this PR
   2. Introduce the extra config bean of compression (I think there is no need to do so)
   
   Besides, I want to refactor the code of MR/Spark client conf entry, this PR is to do some partial work. Please refer to #200



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272939389

   > Can you provide the test report of JVM memory usage? If the new compressor uses much more memory, it will increase the risk of OOM.
   
   The monitor sceenshot is as follows, I dont find the obvious difference.
   
   ![x1](https://user-images.githubusercontent.com/8609142/194823321-1e51d044-6ce8-4e3c-abb5-9090a372ee03.png)
   
   
   ![x2](https://user-images.githubusercontent.com/8609142/194823349-c6aefac8-f7b2-4e16-805e-fd4b679e14f9.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r996261838


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   I mean the compress/decompress could share the same interface for the user.
   For example, `CompressionCodec` has `createOutputStream` (for compress) and `createInputStream` (for decompress).
      



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990726440


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory of() {

Review Comment:
   Done. Rename to `getInstance`



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272447225

   Should we add the document?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990733248


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)
+      .withDescription("");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r995292140


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  public Compressor getCompressor(RssConf conf) {
+    Type type = conf.get(COMPRESSION_TYPE);
+    switch (type) {
+      case ZSTD:

Review Comment:
   OK, got it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r995229628


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   Let me do a simple review about hadoop codec.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272455640

   > Should we add the document?
   
   Done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r996888602


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   So you mean that I need to create similar Zstd/LZ4CompressionCodec to implement the Compressor and Decompressor interface? 
   
   If that, it will make hard to init the corresponding var for specific compressor or decompressor, like the `this.lz4Factory = LZ4Factory.fastestInstance();`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r996888602


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   So you mean that I need to create similar Zstd/LZ4CompressionCodec to implement the Compressor and Decompressor interface? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r997976333


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   You can only provide a Codec instead of CompressionFactory, which hides the inner compressor and decompressor.
   The user could directly use Codec to compress or decompress data, so that the user does not need to use compressor and decompressor directly.
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r997996835


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005269913


##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+
+import com.github.luben.zstd.Zstd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class ZstdCodec extends Codec {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZstdCodec.class);
+
+  private final int compressionLevel;
+
+  public ZstdCodec(int level) {
+    this.compressionLevel = level;
+    LOGGER.info("Initializing zstd compressor.");
+  }
+
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int dstOffset) {
+    if (src.isDirect() && dst.isDirect()) {
+      long size = Zstd.decompressDirectByteBuffer(
+          dst, dstOffset, uncompressedLen,
+          src, src.position(), src.limit() - src.position()
+      );
+      if (size != uncompressedLen) {
+        throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+      }
+      return;
+    }
+
+    if (!src.isDirect() && !dst.isDirect()) {
+      Zstd.decompressByteArray(
+          dst.array(), dstOffset, uncompressedLen,
+          src.array(), src.position(), src.limit() - src.position()
+      );
+      return;
+    }
+
+    throw new RssException("Zstd only supports the same type of bytebuffer decompression.");

Review Comment:
   Done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994602545


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  public Compressor getCompressor(RssConf conf) {
+    Type type = conf.get(COMPRESSION_TYPE);
+    switch (type) {
+      case ZSTD:

Review Comment:
   How to populate dedicated params of specific codec? Like ZSTD level



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r995315155


##########
integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java:
##########
@@ -145,12 +145,12 @@ public void resultCompareTest() throws Exception {
   Map runTest(SparkSession spark, String fileName) throws Exception {
     Thread.sleep(4000);
     Map<Integer, String> map = Maps.newHashMap();
-    Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df2 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
             .otherwise(functions.col("id")).as("key2"), functions.col("id").as("value2"));
-    Dataset<Row> df1 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df1 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
-            .when(functions.col("id").$greater(750), 1000)
+            .when(functions.col("id").$greater(750), 10000)

Review Comment:
   Yes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r996250450


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {

Review Comment:
   I dont find co/decompressor mixed in Hadoop one interface. 
   
   https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/compress/Compressor.java



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994540516


##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCompressor.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import com.github.luben.zstd.Zstd;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005175594


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/RepartitionWithLocalFileRssTest.java:
##########
@@ -53,4 +60,35 @@ public static void setupServers() throws Exception {
   @Override
   public void updateRssStorage(SparkConf sparkConf) {
   }
+
+  /**
+   * Test different compression types with localfile rss mode.
+   * @throws Exception
+   */
+  @Override
+  public void run() throws Exception {
+    String fileName = generateTestFile();
+    SparkConf sparkConf = createSparkConf();
+    Uninterruptibles.sleepUninterruptibly(2, TimeUnit.SECONDS);
+
+    List<Map> results = new ArrayList<>();
+    Map resultWithoutRss = runSparkApp(sparkConf, fileName);
+    results.add(resultWithoutRss);
+
+    updateSparkConfWithRss(sparkConf);
+    updateSparkConfCustomer(sparkConf);
+    for (Codec.Type type :
+        new Codec.Type[]{
+            Codec.Type.NOOP,
+            Codec.Type.ZSTD,
+            Codec.Type.LZ4}) {
+      sparkConf.set("spark." + COMPRESSION_TYPE.key(), type.name());

Review Comment:
   Got it. I will



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272257195

   PTAL @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990720968


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory of() {

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990721644


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   I think I missunderstand you meaning. So we just to use `RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValueString()` to avoid static import, right? 



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   I think I missunderstand you meaning. So we just use `RssSparkConfig.RSS_WRITER_BUFFER_SIZE.defaultValueString()` to avoid static import, right? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990718592


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   Yes. We'd better to avoid this. And in later refactor, all client side's config should be involved in shareable `RssClientConf` class. This problem will not occur, which is mentioned in #200



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990884956


##########
common/src/main/java/org/apache/uniffle/common/compression/Lz4Compressor.java:
##########
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import net.jpountz.lz4.LZ4Compressor;
+import net.jpountz.lz4.LZ4Factory;
+
+public class Lz4Compressor implements Compressor {
+
+  private LZ4Compressor compressor;
+
+  public Lz4Compressor() {
+    this.compressor = LZ4Factory.fastestInstance().fastCompressor();

Review Comment:
   One question:
   Is it a thread safe? 
   In our origin implement, we will use one fastestInstance in one thread.
   In this pr's implement, we will share one fastestInstance in all threads.



##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -109,24 +123,20 @@ public boolean hasNext() {
       shuffleReadMetrics.incFetchWaitTime(fetchDuration);
       if (compressedData != null) {
         shuffleReadMetrics.incRemoteBytesRead(compressedData.limit() - compressedData.position());
-        // Directbytebuffers are not collected in time will cause executor easy 
-        // be killed by cluster managers(such as YARN) for using too much offheap memory
-        if (uncompressedData != null && uncompressedData.isDirect()) {
-          try {
-            RssShuffleUtils.destroyDirectByteBuffer(uncompressedData);
-          } catch (Exception e) {
-            throw new RssException("Destroy DirectByteBuffer failed!", e);
-          }
+
+        int uncompressedLen = compressedBlock.getUncompressLength();
+        if (uncompressedData == null || uncompressedData.capacity() < uncompressedLen) {
+          uncompressedData = ByteBuffer.allocate(uncompressedLen);

Review Comment:
   It's ok for me.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990999460


##########
common/src/main/java/org/apache/uniffle/common/compression/NoOpCompressor.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+/**
+ * The class is to be as the basic compressor for test cases
+ */
+public class NoOpCompressor implements Compressor {
+
+  @Override
+  public byte[] compress(byte[] data) {

Review Comment:
   data -> src ?
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r991004762


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java:
##########
@@ -99,7 +99,7 @@ public void updateSparkConfWithRss(SparkConf sparkConf) {
     sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000");
   }
 
-  private void verifyTestResult(Map expected, Map actual) {
+  public void verifyTestResult(Map expected, Map actual) {

Review Comment:
   protected? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994549560


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory getInstance() {
+    return LazyHolder.INSTANCE;
+  }
+
+  public Compressor getCompressor(RssConf conf) {
+    Type type = conf.get(COMPRESSION_TYPE);
+    switch (type) {
+      case ZSTD:

Review Comment:
   I prefer to just using COMPRESSION_TYPE as the parameter for two reasons.
   
   1. Most of functions in Uniffle do not use Conf as the parameter.
   2. Avoid to modify the content of Conf by mistakes.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r995302963


##########
integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java:
##########
@@ -145,12 +145,12 @@ public void resultCompareTest() throws Exception {
   Map runTest(SparkSession spark, String fileName) throws Exception {
     Thread.sleep(4000);
     Map<Integer, String> map = Maps.newHashMap();
-    Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df2 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
             .otherwise(functions.col("id")).as("key2"), functions.col("id").as("value2"));
-    Dataset<Row> df1 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df1 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
-            .when(functions.col("id").$greater(750), 1000)
+            .when(functions.col("id").$greater(750), 10000)

Review Comment:
   If we use lz4 as default compress codec, it seems that we don't need modify this.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] codecov-commenter commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
codecov-commenter commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272286981

   # [Codecov](https://codecov.io/gh/apache/incubator-uniffle/pull/254?src=pr&el=h1&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) Report
   > Merging [#254](https://codecov.io/gh/apache/incubator-uniffle/pull/254?src=pr&el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (5b34599) into [master](https://codecov.io/gh/apache/incubator-uniffle/commit/4408359fcc9420571d1c8a1fb15c6b551c47f7ca?el=desc&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) (4408359) will **decrease** coverage by `1.14%`.
   > The diff coverage is `69.56%`.
   
   ```diff
   @@             Coverage Diff              @@
   ##             master     #254      +/-   ##
   ============================================
   - Coverage     59.26%   58.12%   -1.15%     
   + Complexity     1345     1283      -62     
   ============================================
     Files           162      161       -1     
     Lines          8789     8379     -410     
     Branches        828      804      -24     
   ============================================
   - Hits           5209     4870     -339     
   + Misses         3312     3246      -66     
   + Partials        268      263       -5     
   ```
   
   
   | [Impacted Files](https://codecov.io/gh/apache/incubator-uniffle/pull/254?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | Coverage Δ | |
   |---|---|---|
   | [...rg/apache/hadoop/mapred/RssMapOutputCollector.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkL1Jzc01hcE91dHB1dENvbGxlY3Rvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [.../java/org/apache/hadoop/mapreduce/RssMRConfig.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL1Jzc01SQ29uZmlnLmphdmE=) | `23.07% <0.00%> (-51.93%)` | :arrow_down: |
   | [...pache/hadoop/mapreduce/task/reduce/RssShuffle.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc1NodWZmbGUuamF2YQ==) | `0.00% <0.00%> (ø)` | |
   | [...che/uniffle/common/compression/NoOpCompressor.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb21wcmVzc2lvbi9Ob09wQ29tcHJlc3Nvci5qYXZh) | `0.00% <0.00%> (ø)` | |
   | [...e/uniffle/common/compression/NoOpDecompressor.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb21wcmVzc2lvbi9Ob09wRGVjb21wcmVzc29yLmphdmE=) | `0.00% <0.00%> (ø)` | |
   | [...e/uniffle/common/compression/ZstdDecompressor.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb21wcmVzc2lvbi9ac3RkRGVjb21wcmVzc29yLmphdmE=) | `68.75% <68.75%> (ø)` | |
   | [...uniffle/common/compression/CompressionFactory.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb21wcmVzc2lvbi9Db21wcmVzc2lvbkZhY3RvcnkuamF2YQ==) | `75.00% <75.00%> (ø)` | |
   | [...rg/apache/uniffle/common/config/RssClientConf.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y29tbW9uL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS91bmlmZmxlL2NvbW1vbi9jb25maWcvUnNzQ2xpZW50Q29uZi5qYXZh) | `93.75% <93.75%> (ø)` | |
   | [...g/apache/hadoop/mapred/SortWriteBufferManager.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkL1NvcnRXcml0ZUJ1ZmZlck1hbmFnZXIuamF2YQ==) | `80.10% <100.00%> (+0.21%)` | :arrow_up: |
   | [...pache/hadoop/mapreduce/task/reduce/RssFetcher.java](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation#diff-Y2xpZW50LW1yL3NyYy9tYWluL2phdmEvb3JnL2FwYWNoZS9oYWRvb3AvbWFwcmVkdWNlL3Rhc2svcmVkdWNlL1Jzc0ZldGNoZXIuamF2YQ==) | `90.90% <100.00%> (+1.84%)` | :arrow_up: |
   | ... and [16 more](https://codecov.io/gh/apache/incubator-uniffle/pull/254/diff?src=pr&el=tree-more&utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation) | |
   
   :mega: We’re building smart automated test selection to slash your CI/CD build times. [Learn more](https://about.codecov.io/iterative-testing/?utm_medium=referral&utm_source=github&utm_content=comment&utm_campaign=pr+comments&utm_term=The+Apache+Software+Foundation)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005178664


##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCodec.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import java.nio.ByteBuffer;
+
+import com.github.luben.zstd.Zstd;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.uniffle.common.exception.RssException;
+
+public class ZstdCodec extends Codec {
+  private static final Logger LOGGER = LoggerFactory.getLogger(ZstdCodec.class);
+
+  private final int compressionLevel;
+
+  public ZstdCodec(int level) {
+    this.compressionLevel = level;
+    LOGGER.info("Initializing zstd compressor.");
+  }
+
+  @Override
+  public void decompress(ByteBuffer src, int uncompressedLen, ByteBuffer dst, int dstOffset) {
+    if (src.isDirect() && dst.isDirect()) {
+      long size = Zstd.decompressDirectByteBuffer(
+          dst, dstOffset, uncompressedLen,
+          src, src.position(), src.limit() - src.position()
+      );
+      if (size != uncompressedLen) {
+        throw new RssException(
+            "This should not happen that the decompressed data size is not equals to original size.");
+      }
+      return;
+    }
+
+    if (!src.isDirect() && !dst.isDirect()) {
+      Zstd.decompressByteArray(
+          dst.array(), dstOffset, uncompressedLen,
+          src.array(), src.position(), src.limit() - src.position()
+      );
+      return;
+    }
+
+    throw new RssException("Zstd only supports the same type of bytebuffer decompression.");

Review Comment:
   I hope it could be as RssException. Because we will catch all RssException in spark client side in our internal version.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r1005175503


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.Codec;
+
+import static org.apache.uniffle.common.compression.Codec.Type.LZ4;
+
+public class RssClientConf {
+
+  public static final ConfigOption<Codec.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(Codec.Type.class)
+      .defaultValue(LZ4)

Review Comment:
   The `ConfigOption<Codec.Type>` is case insensitive. The lz4 or LZ4 or Lz4 are all OK.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1291861617

   LGTM, thanks for your contributions.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee merged pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee merged PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1291562757

   Updated @frankliee . Could you help review again? 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994508820


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)

Review Comment:
   Could we use 'lz4' as default value?



##########
integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java:
##########
@@ -145,12 +145,12 @@ public void resultCompareTest() throws Exception {
   Map runTest(SparkSession spark, String fileName) throws Exception {
     Thread.sleep(4000);
     Map<Integer, String> map = Maps.newHashMap();
-    Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df2 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
             .otherwise(functions.col("id")).as("key2"), functions.col("id").as("value2"));
-    Dataset<Row> df1 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df1 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
-            .when(functions.col("id").$greater(750), 1000)
+            .when(functions.col("id").$greater(750), 10000)

Review Comment:
   Why do we change this?



##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCompressor.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import com.github.luben.zstd.Zstd;

Review Comment:
   Zstd have many implements, why do we choose this?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1282143970

   PTAL @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990728230


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)
+      .withDescription("");

Review Comment:
   Yes. If there is no need to change the config entry, I will add these configs to doc.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r991015250


##########
integration-test/spark-common/src/test/java/org/apache/uniffle/test/SparkIntegrationTestBase.java:
##########
@@ -99,7 +99,7 @@ public void updateSparkConfWithRss(SparkConf sparkConf) {
     sparkConf.set(RssSparkConfig.RSS_HEARTBEAT_INTERVAL.key(), "2000");
   }
 
-  private void verifyTestResult(Map expected, Map actual) {
+  public void verifyTestResult(Map expected, Map actual) {

Review Comment:
   Got it.



##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {
+
+  byte[] compress(byte[] data);

Review Comment:
   Got it.



##########
common/src/main/java/org/apache/uniffle/common/compression/NoOpCompressor.java:
##########
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+/**
+ * The class is to be as the basic compressor for test cases
+ */
+public class NoOpCompressor implements Compressor {
+
+  @Override
+  public byte[] compress(byte[] data) {

Review Comment:
   Got it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990727461


##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)
+      .withDescription("");

Review Comment:
   Should we add the description information?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1275540120

   Gentle ping @frankliee @jerqi 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1286506925

   Bug: I found the zstd has no such method of `decompressByteArray` in Spark2.4.6 zstd version of 1.3.2-2 
   
   To be compatible with the older version, I think I should use the reflection to check it.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1277429736

   Do u have any concerns?  @jerqi @frankliee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994513736


##########
common/src/main/java/org/apache/uniffle/common/compression/ZstdCompressor.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import com.github.luben.zstd.Zstd;

Review Comment:
   It’s also used in Spark



##########
common/src/main/java/org/apache/uniffle/common/config/RssClientConf.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.config;
+
+import org.apache.uniffle.common.compression.CompressionFactory;
+
+import static org.apache.uniffle.common.compression.CompressionFactory.Type.ZSTD;
+
+public class RssClientConf {
+
+  public static final ConfigOption<CompressionFactory.Type> COMPRESSION_TYPE = ConfigOptions
+      .key("rss.client.io.compression.codec")
+      .enumType(CompressionFactory.Type.class)
+      .defaultValue(ZSTD)

Review Comment:
   It’s OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r994515770


##########
integration-test/spark3/src/test/java/org/apache/uniffle/test/GetShuffleReportForMultiPartTest.java:
##########
@@ -145,12 +145,12 @@ public void resultCompareTest() throws Exception {
   Map runTest(SparkSession spark, String fileName) throws Exception {
     Thread.sleep(4000);
     Map<Integer, String> map = Maps.newHashMap();
-    Dataset<Row> df2 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df2 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
             .otherwise(functions.col("id")).as("key2"), functions.col("id").as("value2"));
-    Dataset<Row> df1 = spark.range(0, 1000, 1, 10)
+    Dataset<Row> df1 = spark.range(0, 10000, 1, 10)
         .select(functions.when(functions.col("id").$less(250), 249)
-            .when(functions.col("id").$greater(750), 1000)
+            .when(functions.col("id").$greater(750), 10000)

Review Comment:
   The data size of zstd compressed is slow than the skew join threshold. So increase this record number 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272256773

   ### Terasort Compression Benchmark
   
   100GB terasort
   
   ![c1](https://user-images.githubusercontent.com/8609142/194696365-1dd6ed88-f23d-4bd1-8ab0-43371865f6a1.png)
   ![c2](https://user-images.githubusercontent.com/8609142/194696398-5b8a139f-273f-4df1-b918-928cf86441b1.png)
   ![c3](https://user-images.githubusercontent.com/8609142/194696376-95720873-a5d8-4bdb-a4e2-8dc3a8b51ea2.png)
   ![c4](https://user-images.githubusercontent.com/8609142/194696380-edbc45f4-452c-4e8f-9888-1f6287f32c33.png)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990634145


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory of() {

Review Comment:
   Why is this method name is `of`?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] frankliee commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
frankliee commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r991000596


##########
common/src/main/java/org/apache/uniffle/common/compression/Compressor.java:
##########
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+public interface Compressor {
+
+  byte[] compress(byte[] data);

Review Comment:
   data -> src



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] zuston commented on pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
zuston commented on PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#issuecomment-1272984020

   Updated @frankliee 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990720343


##########
common/src/main/java/org/apache/uniffle/common/compression/CompressionFactory.java:
##########
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.uniffle.common.compression;
+
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.uniffle.common.config.RssClientConf.COMPRESSION_TYPE;
+import static org.apache.uniffle.common.config.RssClientConf.ZSTD_COMPRESSION_LEVEL;
+
+public class CompressionFactory {
+
+  public enum Type {
+    LZ4,
+    ZSTD,
+    NOOP,
+  }
+
+  private CompressionFactory() {
+    // ignore
+  }
+
+  private static class LazyHolder {
+    static final CompressionFactory INSTANCE = new CompressionFactory();
+  }
+
+  public static CompressionFactory of() {

Review Comment:
   Could we make it consistent with other factory methods in our project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org


[GitHub] [incubator-uniffle] jerqi commented on a diff in pull request #254: Support ZSTD

Posted by GitBox <gi...@apache.org>.
jerqi commented on code in PR #254:
URL: https://github.com/apache/incubator-uniffle/pull/254#discussion_r990720327


##########
client-spark/common/src/main/java/org/apache/spark/shuffle/reader/RssShuffleDataIterator.java:
##########
@@ -38,8 +38,12 @@
 
 import org.apache.uniffle.client.api.ShuffleReadClient;
 import org.apache.uniffle.client.response.CompressedShuffleBlock;
-import org.apache.uniffle.common.RssShuffleUtils;
-import org.apache.uniffle.common.exception.RssException;
+import org.apache.uniffle.client.util.RssClientConfig;
+import org.apache.uniffle.common.compression.CompressionFactory;
+import org.apache.uniffle.common.compression.Decompressor;
+import org.apache.uniffle.common.config.RssConf;
+
+import static org.apache.spark.shuffle.RssSparkConfig.RSS_WRITER_BUFFER_SIZE;

Review Comment:
   Could we make it consistent with other factory methods in our project?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: issues-unsubscribe@uniffle.apache.org
For additional commands, e-mail: issues-help@uniffle.apache.org