You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@flink.apache.org by GitBox <gi...@apache.org> on 2022/07/29 11:29:17 UTC

[GitHub] [flink] wsry commented on a diff in pull request #20216: [FLINK-28382] Introduce more compression algorithm based on aircompression

wsry commented on code in PR #20216:
URL: https://github.com/apache/flink/pull/20216#discussion_r932955133


##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;

Review Comment:
   nit: private final



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+
+            int compressedLength =
+                    internalCompressor.compress(
+                            src,
+                            srcOff,
+                            srcLen,
+                            dst,
+                            dstOff + HEADER_LENGTH,
+                            internalCompressor.maxCompressedLength(srcLen));

Review Comment:
   I am not sure about the meaning of the last parameter ```maxOutputLength```, does that mean the number of bytes can be write in the dst array?



##########
flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java:
##########
@@ -37,12 +48,24 @@
  * Tests for {@link CompressedHeaderlessChannelReaderInputView} and {@link
  * CompressedHeaderlessChannelWriterOutputView}.
  */
+@RunWith(Parameterized.class)
 public class CompressedHeaderlessChannelTest {
     private static final int BUFFER_SIZE = 256;
 
     private IOManager ioManager;
 
-    private BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory();
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {

Review Comment:
   I would suggest to not create new compression factory instance directly, instead, we can use BlockCompressionFactory#createBlockCompressionFactory.



##########
flink-runtime/pom.xml:
##########
@@ -165,6 +165,13 @@ under the License.
 			<artifactId>lz4-java</artifactId>
 		</dependency>
 
+		<!-- air compression library -->
+		<dependency>
+			<groupId>io.airlift</groupId>
+			<artifactId>aircompressor</artifactId>
+			<version>0.21</version>
+		</dependency>

Review Comment:
   We need to shade this dependency to avoid conflict version with other modules packaged in dist.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {
+            return blockDecompressor.decompress(
+                    buffer.getMemorySegment().getArray(),
+                    buffer.getMemorySegmentOffset(),
+                    length,
+                    internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   nit: may use internalBuffer.getMemorySegment().getArray().



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;
+            if (originalLen != originalLen2) {
+                throw new DataCorruptionException(
+                        "Input is corrupted, unexpected original length.");
+            }
+        } catch (MalformedInputException e) {
+            throw new DataCorruptionException("Input is corrupted", e);
+        }
+
+        return originalLen;
+    }
+
+    @Override
+    public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        int compressedLen = readIntLE(src, srcOff);
+        int originalLen = readIntLE(src, srcOff + 4);
+        validateLength(compressedLen, originalLen);
+
+        if (dst.length - dstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.length - srcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+
+        try {
+            final int originalLen2 =

Review Comment:
   nit: may rename originalLen2  to decompressedLen



##########
flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java:
##########
@@ -92,7 +92,7 @@ public class NettyShuffleEnvironmentOptions {
     public static final ConfigOption<String> SHUFFLE_COMPRESSION_CODEC =
             key("taskmanager.network.compression.codec")
                     .stringType()
-                    .defaultValue("LZ4")
+                    .defaultValue("Z_STD")

Review Comment:
   1. Revert this test change and add more description about the codec. For example, list the three codecs can be supported and describe the comparison of compression ratio and performance.
   2. Currently, this option is ExcludeFromDocumentation, we can make it experimental. After that, you can regenerate the document.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */

Review Comment:
   wrap -> wraps



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);

Review Comment:
   nit: AirCompressorFactory.HEADER_LENGTH -> HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Should use AirCompressorFactory.HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);
+        }
+    }
+
+    @Override
+    public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            int maxCompressedLength = internalCompressor.maxCompressedLength(srcLen);
+
+            if (dst.length < dstOff + maxCompressedLength - 1) {

Review Comment:
   I think we should also consider the header length?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {

Review Comment:
   May rename to AirBlockCompressor



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java:
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
+
+/**
+ * {@link BlockCompressionFactory} to create wrapped {@link Compressor} and {@link Decompressor}.
+ */
+public class AirCompressorFactory implements BlockCompressionFactory {
+    /**
+     * We put two integers before each compressed block, the first integer represents the compressed
+     * length of the block, and the second one represents the original length of the block.
+     */
+    public static final int HEADER_LENGTH = 8;

Review Comment:
   Maybe move this to BlockCompressionFactory and share with Lz4BlockCompressionFactory or CompressorUtils?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;

Review Comment:
   Should use AirCompressorFactory.HEADER_LENGTH



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {

Review Comment:
   I would suggest to remove this isInsufficientBuffer check and throw exception directly. The method isInsufficientBuffer relies on the internal message string which can be unstable when upgrading the air compressor version.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressor.java:
##########
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Compressor;
+
+import java.nio.BufferOverflowException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink compressor that wrap {@link Compressor}. */
+public class AirCompressor implements BlockCompressor {
+    Compressor internalCompressor;
+
+    public AirCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
+    }
+
+    @Override
+    public int getMaxCompressedSize(int srcSize) {
+        return AirCompressorFactory.HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
+    }
+
+    @Override
+    public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws InsufficientBufferException {
+        try {
+            final int prevSrcOff = src.position() + srcOff;
+            final int prevDstOff = dst.position() + dstOff;
+
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
+
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
+
+            dst.position(prevDstOff);
+            dst.order(ByteOrder.LITTLE_ENDIAN);
+            dst.putInt(compressedLength);
+            dst.putInt(srcLen);
+            dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
+
+            return HEADER_LENGTH + compressedLength;
+        } catch (IllegalArgumentException
+                | ArrayIndexOutOfBoundsException
+                | BufferOverflowException e) {
+            if (e instanceof IllegalArgumentException
+                    && !isInsufficientBuffer((IllegalArgumentException) e)) {
+                throw e;
+            }
+            throw new InsufficientBufferException(e);

Review Comment:
   Though not Introduced by this patch, this exception type is a little confusing. In the long run, I would suggest to replace it with something like BufferCompressionException



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;

Review Comment:
   wrap {@link io.airlift.compress.Decompressor} -> wraps {@link Decompressor}



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java:
##########
@@ -34,7 +43,11 @@ public interface BlockCompressionFactory {
 
     /** Name of {@link BlockCompressionFactory}. */
     enum CompressionFactoryName {
-        LZ4
+        LZ4,
+        LZ4_JAVA,
+        Z_STD,
+        LZO,
+        SNAPPY

Review Comment:
   After testing with tpc-ds, I found that SNAPPY and LZ4_JAVA have similar performance and compression ratio with (performance slightly worse, compression ration slightly high), so I think we can remove LZ4_JAVA and SNAPPY.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   nit: may make buffer.getMemorySegment() a local variable and reuse it latter.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;

Review Comment:
   May also migrate this test to Junit5 and AssertJ



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.
+            if (!buffer.getMemorySegment().isOffHeap()) {
+                compressedLen =
+                        blockCompressor.compress(
+                                buffer.getMemorySegment().getArray(),
+                                buffer.getMemorySegmentOffset(),
+                                length,
+                                internalBuffer.getNioBuffer(0, internalBuffer.capacity()).array(),

Review Comment:
   May use internalBuffer.getMemorySegment().getArray().



##########
flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java:
##########
@@ -18,32 +18,54 @@
 
 package org.apache.flink.runtime.io.compression;
 
+import io.airlift.compress.lz4.Lz4Compressor;
+import io.airlift.compress.lz4.Lz4Decompressor;
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.snappy.SnappyCompressor;
+import io.airlift.compress.snappy.SnappyDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
 import org.junit.Assert;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.nio.ByteBuffer;
 
 import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
 import static org.junit.Assert.assertEquals;
 
 /** Tests for block compression. */
+@RunWith(Parameterized.class)
 public class BlockCompressionTest {
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {
+            new Lz4BlockCompressionFactory(),

Review Comment:
   I would suggest to not create new compression factory instance directly, instead, we can use BlockCompressionFactory#createBlockCompressionFactory.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirDecompressor.java:
##########
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+
+/** Flink decompressor that wrap {@link io.airlift.compress.Decompressor}. */
+public class AirDecompressor implements BlockDecompressor {
+    Decompressor internalDecompressor;
+
+    public AirDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws DataCorruptionException, InsufficientBufferException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new InsufficientBufferException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new DataCorruptionException("Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            int originalLen2 = dst.position() - prevDstOff;

Review Comment:
   nit: local variable can be removed.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);
+        if (!buffer.getMemorySegment().isOffHeap()) {

Review Comment:
   nit: may make buffer.getMemorySegment() a local variable and reuse it latter.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java:
##########
@@ -103,12 +103,22 @@ private int decompress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        // ByteBuffer nioBuffer = buffer.getNioBuffer(0, length);

Review Comment:
   1. Remove this line. 
   2. May need to describe why we use byte array instead of nio buffer here. I think nio buffer is supper set of byte array.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java:
##########
@@ -107,15 +107,28 @@ private int compress(Buffer buffer) {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            // if buffer is in-heap, manipulate the underlying array directly.

Review Comment:
   May need to describe why we use byte array instead of nio buffer here. I think nio buffer is supper set of byte array.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@flink.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org