You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/08 06:56:16 UTC

[GitHub] [flink] reswqa opened a new pull request, #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

reswqa opened a new pull request, #20216:
URL: https://github.com/apache/flink/pull/20216

   
   ## What is the purpose of the change
   
   *Introduce more compression algorithm based on aircompression*
   
   
   ## Brief change log
   
   *(for example:)*
     - *Introduce more compression algorithm based on aircompression*
     - *Migrate some related test to junit5 and assertj*
   
   
   ## Verifying this change
   
   This change is already covered by existing tests
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): no
     - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no
     - The serializers: no
     - The runtime per-record code paths (performance sensitive): no
     - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: no
     - The S3 file system connector: no
   
   ## Documentation
   
     - Does this pull request introduce a new feature? no
     - If yes, how is the feature documented? not applicable
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #20216:
URL: https://github.com/apache/flink/pull/20216#issuecomment-1200755216

   @wsry Thanks for the review, I have address all of your comments, please take a look again.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r934354937


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +111,34 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            MemorySegment memorySegment = buffer.getMemorySegment();
+            // If buffer is in-heap, manipulate the underlying array directly. There are two main
+            // reasons why NIO buffer is not directly used here: One is that some compression
+            // libraries will use the underlying array for heap buffer, but our input buffer may be
+            // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
+            // is that for the buffer in the heap directly operates the underlying array can reduce
+            // additional overhead compared to generating NIO buffer.

Review Comment:
   ```suggestion
               // If buffer is on-heap, manipulate the underlying array directly. There are two main
               // reasons why NIO buffer is not directly used here: One is that some compression
               // libraries will use the underlying array for heap buffer, but our input buffer may be
               // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
               // is that for the on-heap buffer, directly operating the underlying array can reduce
               // additional overhead compared to generating a NIO buffer.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java:
##########
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+
+/** Flink decompressor that wraps {@link Decompressor}. */
+public class AirBlockDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;

Review Comment:
   private final



##########
flink-dist/src/main/resources/META-INF/NOTICE:
##########
@@ -12,6 +12,7 @@ This project bundles the following dependencies under the Apache Software Licens
 - commons-cli:commons-cli:1.5.0
 - commons-collections:commons-collections:3.2.2
 - commons-io:commons-io:2.11.0
+- io.airlift:aircompressor:0.21

Review Comment:
   According to the notice check result, we should remove this.



##########
docs/layouts/shortcodes/generated/all_taskmanager_network_section.html:
##########
@@ -20,6 +20,12 @@
             <td>String</td>
             <td>The blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.compression.codec</h5></td>
+            <td style="word-wrap: break-word;">"LZ4"</td>
+            <td>String</td>
+            <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "Z_STD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression rate is the lowest. "Z_STD"has the highest compression rate, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed future.</td>

Review Comment:
   ```suggestion
               <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in the future.</td>
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+
+/** Flink compressor that wraps {@link Compressor}. */
+public class AirBlockCompressor implements BlockCompressor {
+    private final Compressor internalCompressor;
+
+    public AirBlockCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws BufferCompressionException {
+        try {

Review Comment:
   May also add a length check just like the below compress method.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java:
##########
@@ -19,24 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * A {@code DataCorruptionException} is thrown when the decompressed data is corrupted and cannot be
- * decompressed.
+ * A {@code BufferCompressionException} is thrown when the compression data cannot be compressed,

Review Comment:
   ```suggestion
    * A {@code BufferCompressionException} is thrown when the target data cannot be compressed,
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -37,15 +37,19 @@ public class BufferDecompressor {
     /** The intermediate buffer for the decompressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferDecompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
 
         // the decompressed data size should be never larger than the configured buffer size
-        final byte[] heapBuffer = new byte[bufferSize];
+        internalBufferArray = new byte[bufferSize];

Review Comment:
   nit: internalBufferArray  -> this.internalBufferArray



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java:
##########
@@ -19,25 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * An {@code InsufficientBufferException} is thrown when there is no enough buffer to serialize or
- * deserialize a buffer to another buffer. When such exception being caught, user may enlarge the
- * output buffer and try again.
+ * A {@code BufferDecompressionException} is thrown when the decompressed data cannot be

Review Comment:
   ```suggestion
    * A {@code BufferDecompressionException} is thrown when the target data cannot be
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java:
##########
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+
+/** Flink compressor that wraps {@link Compressor}. */
+public class AirBlockCompressor implements BlockCompressor {
+    private final Compressor internalCompressor;
+
+    public AirBlockCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws BufferCompressionException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws BufferCompressionException {
+        try {
+            if (dst.length < dstOff + getMaxCompressedSize(srcLen) - 1) {

Review Comment:
   Remove ```- 1```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -36,16 +36,20 @@ public class BufferCompressor {
     /** The intermediate buffer for the compressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferCompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
         // the size of this intermediate heap buffer will be gotten from the
         // plugin configuration in the future, and currently, double size of
-        // the input buffer is enough for lz4-java compression library.
-        final byte[] heapBuffer = new byte[2 * bufferSize];
+        // the input buffer is enough for compression libraries we used.

Review Comment:
   ```suggestion
           // the input buffer is enough for the compression libraries used.
   ```



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +107,28 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        MemorySegment memorySegment = buffer.getMemorySegment();
+        // If buffer is in-heap, manipulate the underlying array directly. There are two main
+        // reasons why NIO buffer is not directly used here: One is that some compression
+        // libraries will use the underlying array for heap buffer, but our input buffer may be
+        // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
+        // is that for the buffer in the heap directly operates the underlying array can reduce
+        // additional overhead compared to generating NIO buffer.

Review Comment:
   ```suggestion
           // If buffer is on-heap, manipulate the underlying array directly. There are two main
           // reasons why NIO buffer is not directly used here: One is that some compression
           // libraries will use the underlying array for heap buffer, but our input buffer may be
           // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
           // is that for the on-heap buffer, directly operating the underlying array can reduce
           // additional overhead compared to generating a NIO buffer.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933487835


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {
+            return blockDecompressor.decompress(
+                    buffer.getMemorySegment().getArray(),
+                    buffer.getMemorySegmentOffset(),
+                    length,
+                    internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   Maybe `internalBuffer.array()` is more appropriate?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933483230


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   Good suggestion. In addition, for internalBuffer, can we promote the heap array to class field named `internalBufferArray`, and directly use it instead of `internalBuffer.array()` for the `!buffer.getMemorySegment().isOffHeap()` branch to avoid `ensureAccessible` every time?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933460315


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for block compression. */
+@RunWith(Parameterized.class)
 public class BlockCompressionTest {
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {
+            new Lz4BlockCompressionFactory(),

Review Comment:
   Good suggestion, fix it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933489151


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;
+            if (originalLen != originalLen2) {
+                throw new DataCorruptionException(
+                        "Input is corrupted, unexpected original length.");
+            }
+        } catch (MalformedInputException e) {
+            throw new DataCorruptionException("Input is corrupted", e);
+        }
+
+        return originalLen;
+    }
+
+    @Override
+    public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        int compressedLen = readIntLE(src, srcOff);
+        int originalLen = readIntLE(src, srcOff + 4);
+        validateLength(compressedLen, originalLen);
+
+        if (dst.length - dstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.length - srcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+
+        try {
+            final int originalLen2 =

Review Comment:
   Renamed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933469386


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {
+                compressedLen =
+                        blockCompressor.compress(
+                                buffer.getMemorySegment().getArray(),
+                                buffer.getMemorySegmentOffset(),
+                                length,
+                                internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933420667


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {

Review Comment:
   Good catch, I will use `getMaxCompressedSize(srcLen)` instead of `internalCompressor.maxCompressedLength(srcLen)` as the former will take HEADER size into account.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r934105686


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);

Review Comment:
   I mean the second one. IMO, the BufferCompressionException and BufferDecompressionException can cover the InsufficientBufferException, it just means that exception was thrown when compressing and decompressing data. I think a coarse grained exception type is enough here because the ```caused by``` will give the root cause. Maybe a fine grained exception type is better. But we must figure the InsufficientBufferException out, which can be inaccurate and an inaccurate exception type causes confusion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933487355


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r934106967


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   I think that is a good suggestion.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933400682


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {

Review Comment:
   Ok, removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933430725


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;

Review Comment:
   Ok.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933396817


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {

Review Comment:
   Renamed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933483230


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   Good suggestion. In addition, for internalBuffer, can we promote the heap array to class field named `internalBufferArray`, and directly use it instead of `internalBuffer.array()` for the `!buffer.getMemorySegment().isOffHeap()` branch to avoid `ensureAccessible` every time as we will take care of the ref count of internal buffer.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   Good suggestion. In addition, for internalBuffer, can we promote the heap array to class field named `internalBufferArray`, and directly use it instead of `internalBuffer.array()` for the `!buffer.getMemorySegment().isOffHeap()` branch to avoid `ensureAccessible` every time as we will take care of the ref count of internal buffer?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] flinkbot commented on pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
flinkbot commented on PR #20216:
URL: https://github.com/apache/flink/pull/20216#issuecomment-1178628896

   <!--
   Meta data
   {
     "version" : 1,
     "metaDataEntries" : [ {
       "hash" : "1f8014bc99ccb75c05a1bb906f6dfa5029fb8c8c",
       "status" : "UNKNOWN",
       "url" : "TBD",
       "triggerID" : "1f8014bc99ccb75c05a1bb906f6dfa5029fb8c8c",
       "triggerType" : "PUSH"
     } ]
   }-->
   ## CI report:
   
   * 1f8014bc99ccb75c05a1bb906f6dfa5029fb8c8c UNKNOWN
   
   <details>
   <summary>Bot commands</summary>
     The @flinkbot bot supports the following commands:
   
    - `@flinkbot run azure` re-run the last Azure build
   </details>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933485441


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933428586


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Changed to `CompressorUtils.HEADER_LENGTH`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933489928


##########
flink-runtime/pom.xml:
##########
@@ -165,6 +165,13 @@ under the License.
 			<artifactId>lz4-java</artifactId>
 		</dependency>
 
+		<!-- air compression library -->
+		<dependency>
+			<groupId>io.airlift</groupId>
+			<artifactId>aircompressor</artifactId>
+			<version>0.21</version>
+		</dependency>

Review Comment:
   Good catch, I will shade it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry commented on PR #20216:
URL: https://github.com/apache/flink/pull/20216#issuecomment-1202086980

   @reswqa Thanks for the update. LGTM.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r932955133


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;

Review Comment:
   nit: private final



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+
+            int compressedLength =
+                    internalCompressor.compress(
+                            src,
+                            srcOff,
+                            srcLen,
+                            dst,
+                            dstOff + HEADER_LENGTH,
+                            internalCompressor.maxCompressedLength(srcLen));

Review Comment:
   I am not sure about the meaning of the last parameter ```maxOutputLength```, does that mean the number of bytes can be write in the dst array?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java:
##########
@@ -37,12 +48,24 @@
  * Tests for {@link CompressedHeaderlessChannelReaderInputView} and {@link
  * CompressedHeaderlessChannelWriterOutputView}.
  */
+@RunWith(Parameterized.class)
 public class CompressedHeaderlessChannelTest {
     private static final int BUFFER_SIZE = 256;
 
     private IOManager ioManager;
 
-    private BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory();
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {

Review Comment:
   I would suggest to not create new compression factory instance directly, instead, we can use BlockCompressionFactory#createBlockCompressionFactory.



##########
flink-runtime/pom.xml:
##########
@@ -165,6 +165,13 @@ under the License.
 			<artifactId>lz4-java</artifactId>
 		</dependency>
 
+		<!-- air compression library -->
+		<dependency>
+			<groupId>io.airlift</groupId>
+			<artifactId>aircompressor</artifactId>
+			<version>0.21</version>
+		</dependency>

Review Comment:
   We need to shade this dependency to avoid conflict version with other modules packaged in dist.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {
+            return blockDecompressor.decompress(
+                    buffer.getMemorySegment().getArray(),
+                    buffer.getMemorySegmentOffset(),
+                    length,
+                    internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   nit: may use internalBuffer.getMemorySegment().getArray().



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;
+            if (originalLen != originalLen2) {
+                throw new DataCorruptionException(
+                        "Input is corrupted, unexpected original length.");
+            }
+        } catch (MalformedInputException e) {
+            throw new DataCorruptionException("Input is corrupted", e);
+        }
+
+        return originalLen;
+    }
+
+    @Override
+    public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        int compressedLen = readIntLE(src, srcOff);
+        int originalLen = readIntLE(src, srcOff + 4);
+        validateLength(compressedLen, originalLen);
+
+        if (dst.length - dstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.length - srcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+
+        try {
+            final int originalLen2 =

Review Comment:
   nit: may rename originalLen2  to decompressedLen



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -92,7 +92,7 @@ public class NettyShuffleEnvironmentOptions {
     public static final ConfigOption<String> SHUFFLE_COMPRESSION_CODEC =
             key("taskmanager.network.compression.codec")
                     .stringType()
-                    .defaultValue("LZ4")
+                    .defaultValue("Z_STD")

Review Comment:
   1. Revert this test change and add more description about the codec. For example, list the three codecs can be supported and describe the comparison of compression ratio and performance.
   2. Currently, this option is ExcludeFromDocumentation, we can make it experimental. After that, you can regenerate the document.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */

Review Comment:
   wrap -> wraps



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);

Review Comment:
   nit: AirCompressorFactory.HEADER_LENGTH -> HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Should use AirCompressorFactory.HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {

Review Comment:
   I think we should also consider the header length?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {

Review Comment:
   May rename to AirBlockCompressor



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+
+/**
+ * {@link BlockCompressionFactory} to create wrapped {@link Compressor} and {@link Decompressor}.
+ */
+public class AirCompressorFactory implements BlockCompressionFactory {
+    /**
+     * We put two integers before each compressed block, the first integer represents the compressed
+     * length of the block, and the second one represents the original length of the block.
+     */
+    public static final int HEADER_LENGTH = 8;

Review Comment:
   Maybe move this to BlockCompressionFactory and share with Lz4BlockCompressionFactory or CompressorUtils?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Should use AirCompressorFactory.HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {

Review Comment:
   I would suggest to remove this isInsufficientBuffer check and throw exception directly. The method isInsufficientBuffer relies on the internal message string which can be unstable when upgrading the air compressor version.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);

Review Comment:
   Though not Introduced by this patch, this exception type is a little confusing. In the long run, I would suggest to replace it with something like BufferCompressionException



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;

Review Comment:
   wrap {@link io.airlift.compress.Decompressor} -> wraps {@link Decompressor}



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java:
##########
@@ -34,7 +43,11 @@ public interface BlockCompressionFactory {
 
     /** Name of {@link BlockCompressionFactory}. */
     enum CompressionFactoryName {
-        LZ4
+        LZ4,
+        LZ4_JAVA,
+        Z_STD,
+        LZO,
+        SNAPPY

Review Comment:
   After testing with tpc-ds, I found that SNAPPY and LZ4_JAVA have similar performance and compression ratio with (performance slightly worse, compression ration slightly high), so I think we can remove LZ4_JAVA and SNAPPY.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   nit: may make buffer.getMemorySegment() a local variable and reuse it latter.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;

Review Comment:
   May also migrate this test to Junit5 and AssertJ



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {
+                compressedLen =
+                        blockCompressor.compress(
+                                buffer.getMemorySegment().getArray(),
+                                buffer.getMemorySegmentOffset(),
+                                length,
+                                internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   May use internalBuffer.getMemorySegment().getArray().



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for block compression. */
+@RunWith(Parameterized.class)
 public class BlockCompressionTest {
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {
+            new Lz4BlockCompressionFactory(),

Review Comment:
   I would suggest to not create new compression factory instance directly, instead, we can use BlockCompressionFactory#createBlockCompressionFactory.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;

Review Comment:
   nit: local variable can be removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   nit: may make buffer.getMemorySegment() a local variable and reuse it latter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);

Review Comment:
   1. Remove this line. 
   2. May need to describe why we use byte array instead of nio buffer here. I think nio buffer is supper set of byte array.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.

Review Comment:
   May need to describe why we use byte array instead of nio buffer here. I think nio buffer is supper set of byte array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933414752


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+
+            int compressedLength =
+                    internalCompressor.compress(
+                            src,
+                            srcOff,
+                            srcLen,
+                            dst,
+                            dstOff + HEADER_LENGTH,
+                            internalCompressor.maxCompressedLength(srcLen));

Review Comment:
   Yes, it is used in some compress algorithm to compute output array's limit address and do some check. `internalCompressor.maxCompressedLength(srcLen)` will give us an upper bound of this algorithm's compress result, and in our scenario, the output array is guaranteed to be sufficient(double size of input buffer), so i just use `internalCompressor.maxCompressedLength` to play the role of `maxOutputLength `.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933450458


##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;

Review Comment:
   Sure, i will do this work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933490907


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;

Review Comment:
   Removed it.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933432278


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java:
##########
@@ -34,7 +43,11 @@ public interface BlockCompressionFactory {
 
     /** Name of {@link BlockCompressionFactory}. */
     enum CompressionFactoryName {
-        LZ4
+        LZ4,
+        LZ4_JAVA,
+        Z_STD,
+        LZO,
+        SNAPPY

Review Comment:
   Agree with you, thank you very much for your patient test work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933399129


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933397448


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */

Review Comment:
   Fixed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933426072


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+
+/**
+ * {@link BlockCompressionFactory} to create wrapped {@link Compressor} and {@link Decompressor}.
+ */
+public class AirCompressorFactory implements BlockCompressionFactory {
+    /**
+     * We put two integers before each compressed block, the first integer represents the compressed
+     * length of the block, and the second one represents the original length of the block.
+     */
+    public static final int HEADER_LENGTH = 8;

Review Comment:
   Extract this field to CompressorUtils.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933461374


##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java:
##########
@@ -37,12 +48,24 @@
  * Tests for {@link CompressedHeaderlessChannelReaderInputView} and {@link
  * CompressedHeaderlessChannelWriterOutputView}.
  */
+@RunWith(Parameterized.class)
 public class CompressedHeaderlessChannelTest {
     private static final int BUFFER_SIZE = 256;
 
     private IOManager ioManager;
 
-    private BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory();
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933496841


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);

Review Comment:
   Does  `BufferCompressionException ` have the same semantics as `InsufficientBufferException ` (there is no enough buffer space to compress or decompress a buffer to another buffer)? Or `BufferCompressionException` mean that buffer cannot be compressed or decompressed for some reason. If it is the latter, I will directly capture `Exception e` in the catch statement and convert them into `BufferCompressionException`, Otherwise, only capture the exception related to the semantics of buffer space insufficient and turn it into `BufferCompressionException`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on PR #20216:
URL: https://github.com/apache/flink/pull/20216#issuecomment-1178625674

   @wsry Would you like to help review this pull request?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933396506


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;

Review Comment:
   Done.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933428994


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Changed to `CompressorUtils.HEADER_LENGTH`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] reswqa commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
reswqa commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r933466874


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.

Review Comment:
   Add some comment about the reason.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


[GitHub] [flink] wsry closed pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

Posted by GitBox <gi...@apache.org>.
wsry closed pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression
URL: https://github.com/apache/flink/pull/20216


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org