You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@flink.apache.org by yi...@apache.org on 2022/08/02 06:50:14 UTC

[flink] 02/02: [FLINK-28382][network] Introduce LZO and ZSTD compression based on aircompressor for blocking shuffle

This is an automated email from the ASF dual-hosted git repository.

yingjie pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 47d0b6d26c052a817a66f7b719eecf01387cb0d3
Author: Weijie Guo <re...@163.com>
AuthorDate: Sat Jul 30 00:53:52 2022 +0800

    [FLINK-28382][network] Introduce LZO and ZSTD compression based on aircompressor for blocking shuffle
    
    This closes #20216.
---
 .../generated/all_taskmanager_network_section.html |   6 ++
 .../netty_shuffle_environment_configuration.html   |   6 ++
 .../NettyShuffleEnvironmentOptions.java            |  14 ++-
 flink-runtime/pom.xml                              |  38 +++++++-
 ...lockCompressor.java => AirBlockCompressor.java} |  76 +++++++--------
 .../io/compression/AirBlockDecompressor.java       | 103 +++++++++++++++++++++
 ...ssionFactory.java => AirCompressorFactory.java} |  25 +++--
 .../io/compression/BlockCompressionFactory.java    |  19 +++-
 .../runtime/io/compression/BlockCompressor.java    |   8 +-
 .../runtime/io/compression/BlockDecompressor.java  |  10 +-
 ...eption.java => BufferCompressionException.java} |  14 +--
 ...tion.java => BufferDecompressionException.java} |  15 ++-
 ...ompressionFactory.java => CompressorUtils.java} |  31 +++++--
 .../io/compression/Lz4BlockCompressionFactory.java |   7 --
 .../runtime/io/compression/Lz4BlockCompressor.java |  24 ++---
 .../io/compression/Lz4BlockDecompressor.java       |  40 ++++----
 .../io/network/buffer/BufferCompressor.java        |  47 +++++++---
 .../io/network/buffer/BufferDecompressor.java      |  40 ++++++--
 flink-runtime/src/main/resources/META-INF/NOTICE   |   9 ++
 .../io/compression/BlockCompressionTest.java       |  22 +++--
 .../io/network/buffer/BufferCompressionTest.java   |  12 +++
 ...editBasedPartitionRequestClientHandlerTest.java |   8 +-
 .../NettyMessageClientSideSerializationTest.java   |  21 +++--
 .../partition/consumer/SingleInputGateTest.java    |   8 +-
 .../io/CompressedHeaderlessChannelTest.java        |  15 ++-
 25 files changed, 437 insertions(+), 181 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
index a6705820858..a626890a3fc 100644
--- a/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
+++ b/docs/layouts/shortcodes/generated/all_taskmanager_network_section.html
@@ -20,6 +20,12 @@
             <td>String</td>
             <td>The blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.compression.codec</h5></td>
+            <td style="word-wrap: break-word;">"LZ4"</td>
+            <td>String</td>
+            <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in  [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.network.detailed-metrics</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
index c9f19ff76ba..8bbd176d85e 100644
--- a/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
+++ b/docs/layouts/shortcodes/generated/netty_shuffle_environment_configuration.html
@@ -38,6 +38,12 @@
             <td>String</td>
             <td>The blocking shuffle type, either "mmap" or "file". The "auto" means selecting the property type automatically based on system memory architecture (64 bit for mmap and 32 bit for file). Note that the memory usage of mmap is not accounted by configured memory limits, but some resource frameworks like yarn would track this memory usage and kill the container once memory exceeding some threshold. Also note that this option is experimental and might be changed future.</td>
         </tr>
+        <tr>
+            <td><h5>taskmanager.network.compression.codec</h5></td>
+            <td style="word-wrap: break-word;">"LZ4"</td>
+            <td>String</td>
+            <td>The codec to be used when compressing shuffle data, only "LZ4", "LZO" and "ZSTD" are supported now. Through tpc-ds test of these three algorithms, the results show that "LZ4" algorithm has the highest compression and decompression speed, but the compression ratio is the lowest. "ZSTD" has the highest compression ratio, but the compression and decompression speed is the slowest, and LZO is between the two. Also note that this option is experimental and might be changed in  [...]
+        </tr>
         <tr>
             <td><h5>taskmanager.network.detailed-metrics</h5></td>
             <td style="word-wrap: break-word;">false</td>
diff --git a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
index 0286bc27254..51ba09faad3 100644
--- a/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
+++ b/flink-core/src/main/java/org/apache/flink/configuration/NettyShuffleEnvironmentOptions.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.configuration;
 
+import org.apache.flink.annotation.Experimental;
 import org.apache.flink.annotation.PublicEvolving;
 import org.apache.flink.annotation.docs.Documentation;
 
@@ -88,12 +89,21 @@ public class NettyShuffleEnvironmentOptions {
                                     + "ratio is high.");
 
     /** The codec to be used when compressing shuffle data. */
-    @Documentation.ExcludeFromDocumentation("Currently, LZ4 is the only legal option.")
+    @Documentation.Section(Documentation.Sections.ALL_TASK_MANAGER_NETWORK)
+    @Experimental
     public static final ConfigOption<String> SHUFFLE_COMPRESSION_CODEC =
             key("taskmanager.network.compression.codec")
                     .stringType()
                     .defaultValue("LZ4")
-                    .withDescription("The codec to be used when compressing shuffle data.");
+                    .withDescription(
+                            "The codec to be used when compressing shuffle data, only \"LZ4\", \"LZO\" "
+                                    + "and \"ZSTD\" are supported now. Through tpc-ds test of these "
+                                    + "three algorithms, the results show that \"LZ4\" algorithm has "
+                                    + "the highest compression and decompression speed, but the "
+                                    + "compression ratio is the lowest. \"ZSTD\" has the highest "
+                                    + "compression ratio, but the compression and decompression "
+                                    + "speed is the slowest, and LZO is between the two. Also note "
+                                    + "that this option is experimental and might be changed in the future.");
 
     /**
      * Boolean flag to enable/disable more detailed metrics about inbound/outbound network queue
diff --git a/flink-runtime/pom.xml b/flink-runtime/pom.xml
index 631a10f094a..86206ad4dd3 100644
--- a/flink-runtime/pom.xml
+++ b/flink-runtime/pom.xml
@@ -141,7 +141,7 @@ under the License.
 			<groupId>org.apache.commons</groupId>
 			<artifactId>commons-lang3</artifactId>
 		</dependency>
-		
+
 		<dependency>
 			<groupId>commons-cli</groupId>
 			<artifactId>commons-cli</artifactId>
@@ -165,6 +165,13 @@ under the License.
 			<artifactId>lz4-java</artifactId>
 		</dependency>
 
+		<!-- air compression library -->
+		<dependency>
+			<groupId>io.airlift</groupId>
+			<artifactId>aircompressor</artifactId>
+			<version>0.21</version>
+		</dependency>
+
 		<!-- test dependencies -->
 
 		<dependency>
@@ -303,6 +310,35 @@ under the License.
 					</execution>
 				</executions>
 			</plugin>
+
+			<plugin>
+				<groupId>org.apache.maven.plugins</groupId>
+				<artifactId>maven-shade-plugin</artifactId>
+				<executions>
+					<execution>
+						<id>shade-flink</id>
+						<phase>package</phase>
+						<goals>
+							<goal>shade</goal>
+						</goals>
+						<configuration>
+							<artifactSet>
+								<includes>
+									<include>io.airlift:aircompressor</include>
+								</includes>
+							</artifactSet>
+							<relocations>
+								<relocation>
+									<pattern>io.airlift.compress</pattern>
+									<shadedPattern>
+										org.apache.flink.shaded.io.airlift.compress
+									</shadedPattern>
+								</relocation>
+							</relocations>
+						</configuration>
+					</execution>
+				</executions>
+			</plugin>
 		</plugins>
 	</build>
 </project>
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java
similarity index 51%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java
index 86607c73d86..b4e1664e107 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockCompressor.java
@@ -18,53 +18,44 @@
 
 package org.apache.flink.runtime.io.compression;
 
-import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Exception;
-import net.jpountz.lz4.LZ4Factory;
+import io.airlift.compress.Compressor;
 
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
 
-/**
- * Encode data into LZ4 format (not compatible with the LZ4 Frame format). It reads from and writes
- * to byte arrays provided from the outside, thus reducing copy time.
- *
- * <p>This class is copied and modified from {@link net.jpountz.lz4.LZ4BlockOutputStream}.
- */
-public class Lz4BlockCompressor implements BlockCompressor {
+/** Flink compressor that wraps {@link Compressor}. */
+public class AirBlockCompressor implements BlockCompressor {
+    private final Compressor internalCompressor;
 
-    private final LZ4Compressor compressor;
-
-    public Lz4BlockCompressor() {
-        this.compressor = LZ4Factory.fastestInstance().fastCompressor();
+    public AirBlockCompressor(Compressor internalCompressor) {
+        this.internalCompressor = internalCompressor;
     }
 
     @Override
     public int getMaxCompressedSize(int srcSize) {
-        return HEADER_LENGTH + compressor.maxCompressedLength(srcSize);
+        return HEADER_LENGTH + internalCompressor.maxCompressedLength(srcSize);
     }
 
     @Override
     public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
-            throws InsufficientBufferException {
+            throws BufferCompressionException {
         try {
+            if (dst.remaining() < dstOff + getMaxCompressedSize(srcLen)) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+
             final int prevSrcOff = src.position() + srcOff;
             final int prevDstOff = dst.position() + dstOff;
 
-            int maxCompressedSize = compressor.maxCompressedLength(srcLen);
-            int compressedLength =
-                    compressor.compress(
-                            src,
-                            prevSrcOff,
-                            srcLen,
-                            dst,
-                            prevDstOff + HEADER_LENGTH,
-                            maxCompressedSize);
+            src.position(prevSrcOff);
+            dst.position(prevDstOff + HEADER_LENGTH);
 
-            src.position(prevSrcOff + srcLen);
+            internalCompressor.compress(src, dst);
+
+            int compressedLength = dst.position() - prevDstOff - HEADER_LENGTH;
 
             dst.position(prevDstOff);
             dst.order(ByteOrder.LITTLE_ENDIAN);
@@ -73,29 +64,32 @@ public class Lz4BlockCompressor implements BlockCompressor {
             dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
 
             return HEADER_LENGTH + compressedLength;
-        } catch (LZ4Exception | ArrayIndexOutOfBoundsException | BufferOverflowException e) {
-            throw new InsufficientBufferException(e);
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
         }
     }
 
     @Override
     public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
-            throws InsufficientBufferException {
+            throws BufferCompressionException {
         try {
+            if (dst.length < dstOff + getMaxCompressedSize(srcLen)) {
+                throw new ArrayIndexOutOfBoundsException();
+            }
+
             int compressedLength =
-                    compressor.compress(src, srcOff, srcLen, dst, dstOff + HEADER_LENGTH);
+                    internalCompressor.compress(
+                            src,
+                            srcOff,
+                            srcLen,
+                            dst,
+                            dstOff + HEADER_LENGTH,
+                            internalCompressor.maxCompressedLength(srcLen));
             writeIntLE(compressedLength, dst, dstOff);
             writeIntLE(srcLen, dst, dstOff + 4);
             return HEADER_LENGTH + compressedLength;
-        } catch (LZ4Exception | BufferOverflowException | ArrayIndexOutOfBoundsException e) {
-            throw new InsufficientBufferException(e);
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
         }
     }
-
-    private static void writeIntLE(int i, byte[] buf, int offset) {
-        buf[offset++] = (byte) i;
-        buf[offset++] = (byte) (i >>> 8);
-        buf[offset++] = (byte) (i >>> 16);
-        buf[offset] = (byte) (i >>> 24);
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java
new file mode 100644
index 00000000000..765b9f98875
--- /dev/null
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirBlockDecompressor.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.compression;
+
+import io.airlift.compress.Decompressor;
+import io.airlift.compress.MalformedInputException;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
+
+/** Flink decompressor that wraps {@link Decompressor}. */
+public class AirBlockDecompressor implements BlockDecompressor {
+    private final Decompressor internalDecompressor;
+
+    public AirBlockDecompressor(Decompressor internalDecompressor) {
+        this.internalDecompressor = internalDecompressor;
+    }
+
+    @Override
+    public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
+            throws BufferDecompressionException {
+        final int prevSrcOff = src.position() + srcOff;
+        final int prevDstOff = dst.position() + dstOff;
+
+        src.position(prevSrcOff);
+        dst.position(prevDstOff);
+        src.order(ByteOrder.LITTLE_ENDIAN);
+        final int compressedLen = src.getInt();
+        final int originalLen = src.getInt();
+        validateLength(compressedLen, originalLen);
+
+        if (dst.capacity() - prevDstOff < originalLen) {
+            throw new BufferDecompressionException("Buffer length too small");
+        }
+
+        if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
+            throw new BufferDecompressionException(
+                    "Source data is not integral for decompression.");
+        }
+        src.limit(prevSrcOff + compressedLen + HEADER_LENGTH);
+        try {
+            internalDecompressor.decompress(src, dst);
+            if (originalLen != dst.position() - prevDstOff) {
+                throw new BufferDecompressionException(
+                        "Input is corrupted, unexpected original length.");
+            }
+        } catch (MalformedInputException e) {
+            throw new BufferDecompressionException("Input is corrupted", e);
+        }
+
+        return originalLen;
+    }
+
+    @Override
+    public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
+            throws BufferDecompressionException {
+        int compressedLen = readIntLE(src, srcOff);
+        int originalLen = readIntLE(src, srcOff + 4);
+        validateLength(compressedLen, originalLen);
+
+        if (dst.length - dstOff < originalLen) {
+            throw new BufferDecompressionException("Buffer length too small");
+        }
+
+        if (src.length - srcOff - HEADER_LENGTH < compressedLen) {
+            throw new BufferDecompressionException(
+                    "Source data is not integral for decompression.");
+        }
+
+        try {
+            final int decompressedLen =
+                    internalDecompressor.decompress(
+                            src, srcOff + HEADER_LENGTH, compressedLen, dst, dstOff, originalLen);
+            if (originalLen != decompressedLen) {
+                throw new BufferDecompressionException("Input is corrupted");
+            }
+        } catch (MalformedInputException e) {
+            throw new BufferDecompressionException("Input is corrupted", e);
+        }
+
+        return originalLen;
+    }
+}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java
similarity index 58%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java
index dec76400c4d..ab68abcf11a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/AirCompressorFactory.java
@@ -18,22 +18,29 @@
 
 package org.apache.flink.runtime.io.compression;
 
-/** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
-public class Lz4BlockCompressionFactory implements BlockCompressionFactory {
+import io.airlift.compress.Compressor;
+import io.airlift.compress.Decompressor;
 
-    /**
-     * We put two integers before each compressed block, the first integer represents the compressed
-     * length of the block, and the second one represents the original length of the block.
-     */
-    public static final int HEADER_LENGTH = 8;
+/**
+ * {@link BlockCompressionFactory} to create wrapped {@link Compressor} and {@link Decompressor}.
+ */
+public class AirCompressorFactory implements BlockCompressionFactory {
+    private final Compressor internalCompressor;
+
+    private final Decompressor internalDecompressor;
+
+    public AirCompressorFactory(Compressor internalCompressor, Decompressor internalDecompressor) {
+        this.internalCompressor = internalCompressor;
+        this.internalDecompressor = internalDecompressor;
+    }
 
     @Override
     public BlockCompressor getCompressor() {
-        return new Lz4BlockCompressor();
+        return new AirBlockCompressor(internalCompressor);
     }
 
     @Override
     public BlockDecompressor getDecompressor() {
-        return new Lz4BlockDecompressor();
+        return new AirBlockDecompressor(internalDecompressor);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java
index 587c17ffcd3..145578e036f 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressionFactory.java
@@ -20,6 +20,11 @@ package org.apache.flink.runtime.io.compression;
 
 import org.apache.flink.configuration.IllegalConfigurationException;
 
+import io.airlift.compress.lzo.LzoCompressor;
+import io.airlift.compress.lzo.LzoDecompressor;
+import io.airlift.compress.zstd.ZstdCompressor;
+import io.airlift.compress.zstd.ZstdDecompressor;
+
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
 /**
@@ -34,7 +39,9 @@ public interface BlockCompressionFactory {
 
     /** Name of {@link BlockCompressionFactory}. */
     enum CompressionFactoryName {
-        LZ4
+        LZ4,
+        LZO,
+        ZSTD
     }
 
     /**
@@ -54,12 +61,20 @@ public interface BlockCompressionFactory {
             compressionName = null;
         }
 
-        BlockCompressionFactory blockCompressionFactory = null;
+        BlockCompressionFactory blockCompressionFactory;
         if (compressionName != null) {
             switch (compressionName) {
                 case LZ4:
                     blockCompressionFactory = new Lz4BlockCompressionFactory();
                     break;
+                case LZO:
+                    blockCompressionFactory =
+                            new AirCompressorFactory(new LzoCompressor(), new LzoDecompressor());
+                    break;
+                case ZSTD:
+                    blockCompressionFactory =
+                            new AirCompressorFactory(new ZstdCompressor(), new ZstdDecompressor());
+                    break;
                 default:
                     throw new IllegalStateException("Unknown CompressionMethod " + compressionName);
             }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressor.java
index edcc5468c0f..1d83f0ab8d2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockCompressor.java
@@ -39,10 +39,10 @@ public interface BlockCompressor {
      * @param dst The target to write compressed data
      * @param dstOff The start offset to write the compressed data
      * @return Length of compressed data
-     * @throws InsufficientBufferException if the target does not have sufficient space
+     * @throws BufferCompressionException if exception thrown when compressing
      */
     int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
-            throws InsufficientBufferException;
+            throws BufferCompressionException;
 
     /**
      * Compress data read from src, and write the compressed data to dst.
@@ -53,8 +53,8 @@ public interface BlockCompressor {
      * @param dst The target to write compressed data
      * @param dstOff The start offset to write the compressed data
      * @return Length of compressed data
-     * @throws InsufficientBufferException if the target does not have sufficient space
+     * @throws BufferCompressionException if exception thrown when compressing
      */
     int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
-            throws InsufficientBufferException;
+            throws BufferCompressionException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockDecompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockDecompressor.java
index a52d787db81..ba6b79f4eaf 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockDecompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BlockDecompressor.java
@@ -33,11 +33,10 @@ public interface BlockDecompressor {
      * @param dst The target to write decompressed data
      * @param dstOff The start offset to write the decompressed data
      * @return Length of decompressed data
-     * @throws DataCorruptionException if data corruption found when decompressing
-     * @throws InsufficientBufferException if the target does not have sufficient space
+     * @throws BufferDecompressionException if exception thrown when decompressing
      */
     int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
-            throws DataCorruptionException, InsufficientBufferException;
+            throws BufferDecompressionException;
 
     /**
      * Decompress source data read from src and write the decompressed data to dst.
@@ -48,9 +47,8 @@ public interface BlockDecompressor {
      * @param dst The target to write decompressed data
      * @param dstOff The start offset to write the decompressed data
      * @return Length of decompressed data
-     * @throws DataCorruptionException if data corruption found when decompressing
-     * @throws InsufficientBufferException if the target does not have sufficient space
+     * @throws BufferDecompressionException if exception thrown when decompressing
      */
     int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
-            throws DataCorruptionException, InsufficientBufferException;
+            throws BufferDecompressionException;
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/DataCorruptionException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java
similarity index 68%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/DataCorruptionException.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java
index 9169cebfd98..dc2461cc06a 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/DataCorruptionException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferCompressionException.java
@@ -19,24 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * A {@code DataCorruptionException} is thrown when the decompressed data is corrupted and cannot be
- * decompressed.
+ * A {@code BufferCompressionException} is thrown when the target data cannot be compressed, such as
+ * insufficient target buffer space for compression, etc.
  */
-public class DataCorruptionException extends RuntimeException {
+public class BufferCompressionException extends RuntimeException {
 
-    public DataCorruptionException() {
+    public BufferCompressionException() {
         super();
     }
 
-    public DataCorruptionException(String message) {
+    public BufferCompressionException(String message) {
         super(message);
     }
 
-    public DataCorruptionException(String message, Throwable e) {
+    public BufferCompressionException(String message, Throwable e) {
         super(message, e);
     }
 
-    public DataCorruptionException(Throwable e) {
+    public BufferCompressionException(Throwable e) {
         super(e);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/InsufficientBufferException.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java
similarity index 65%
rename from flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/InsufficientBufferException.java
rename to flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java
index b67c7f4362b..65be2b1aacb 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/InsufficientBufferException.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/BufferDecompressionException.java
@@ -19,25 +19,24 @@
 package org.apache.flink.runtime.io.compression;
 
 /**
- * An {@code InsufficientBufferException} is thrown when there is no enough buffer to serialize or
- * deserialize a buffer to another buffer. When such exception being caught, user may enlarge the
- * output buffer and try again.
+ * A {@code BufferDecompressionException} is thrown when the target data cannot be decompressed,
+ * such as data corruption, insufficient target buffer space for decompression, etc.
  */
-public class InsufficientBufferException extends RuntimeException {
+public class BufferDecompressionException extends RuntimeException {
 
-    public InsufficientBufferException() {
+    public BufferDecompressionException() {
         super();
     }
 
-    public InsufficientBufferException(String message) {
+    public BufferDecompressionException(String message) {
         super(message);
     }
 
-    public InsufficientBufferException(String message, Throwable e) {
+    public BufferDecompressionException(String message, Throwable e) {
         super(message, e);
     }
 
-    public InsufficientBufferException(Throwable e) {
+    public BufferDecompressionException(Throwable e) {
         super(e);
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/CompressorUtils.java
similarity index 54%
copy from flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
copy to flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/CompressorUtils.java
index dec76400c4d..f531bbed4f2 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/CompressorUtils.java
@@ -18,22 +18,35 @@
 
 package org.apache.flink.runtime.io.compression;
 
-/** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
-public class Lz4BlockCompressionFactory implements BlockCompressionFactory {
-
+/** Utils for {@link BlockCompressor}. */
+public class CompressorUtils {
     /**
      * We put two integers before each compressed block, the first integer represents the compressed
      * length of the block, and the second one represents the original length of the block.
      */
     public static final int HEADER_LENGTH = 8;
 
-    @Override
-    public BlockCompressor getCompressor() {
-        return new Lz4BlockCompressor();
+    public static void writeIntLE(int i, byte[] buf, int offset) {
+        buf[offset++] = (byte) i;
+        buf[offset++] = (byte) (i >>> 8);
+        buf[offset++] = (byte) (i >>> 16);
+        buf[offset] = (byte) (i >>> 24);
+    }
+
+    public static int readIntLE(byte[] buf, int i) {
+        return (buf[i] & 0xFF)
+                | ((buf[i + 1] & 0xFF) << 8)
+                | ((buf[i + 2] & 0xFF) << 16)
+                | ((buf[i + 3] & 0xFF) << 24);
     }
 
-    @Override
-    public BlockDecompressor getDecompressor() {
-        return new Lz4BlockDecompressor();
+    public static void validateLength(int compressedLen, int originalLen)
+            throws BufferDecompressionException {
+        if (originalLen < 0
+                || compressedLen < 0
+                || (originalLen == 0 && compressedLen != 0)
+                || (originalLen != 0 && compressedLen == 0)) {
+            throw new BufferDecompressionException("Input is corrupted, invalid length.");
+        }
     }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
index dec76400c4d..7e21979be82 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressionFactory.java
@@ -20,13 +20,6 @@ package org.apache.flink.runtime.io.compression;
 
 /** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
 public class Lz4BlockCompressionFactory implements BlockCompressionFactory {
-
-    /**
-     * We put two integers before each compressed block, the first integer represents the compressed
-     * length of the block, and the second one represents the original length of the block.
-     */
-    public static final int HEADER_LENGTH = 8;
-
     @Override
     public BlockCompressor getCompressor() {
         return new Lz4BlockCompressor();
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java
index 86607c73d86..158afafeefd 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockCompressor.java
@@ -19,14 +19,13 @@
 package org.apache.flink.runtime.io.compression;
 
 import net.jpountz.lz4.LZ4Compressor;
-import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 
-import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.writeIntLE;
 
 /**
  * Encode data into LZ4 format (not compatible with the LZ4 Frame format). It reads from and writes
@@ -49,7 +48,7 @@ public class Lz4BlockCompressor implements BlockCompressor {
 
     @Override
     public int compress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
-            throws InsufficientBufferException {
+            throws BufferCompressionException {
         try {
             final int prevSrcOff = src.position() + srcOff;
             final int prevDstOff = dst.position() + dstOff;
@@ -73,29 +72,22 @@ public class Lz4BlockCompressor implements BlockCompressor {
             dst.position(prevDstOff + compressedLength + HEADER_LENGTH);
 
             return HEADER_LENGTH + compressedLength;
-        } catch (LZ4Exception | ArrayIndexOutOfBoundsException | BufferOverflowException e) {
-            throw new InsufficientBufferException(e);
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
         }
     }
 
     @Override
     public int compress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
-            throws InsufficientBufferException {
+            throws BufferCompressionException {
         try {
             int compressedLength =
                     compressor.compress(src, srcOff, srcLen, dst, dstOff + HEADER_LENGTH);
             writeIntLE(compressedLength, dst, dstOff);
             writeIntLE(srcLen, dst, dstOff + 4);
             return HEADER_LENGTH + compressedLength;
-        } catch (LZ4Exception | BufferOverflowException | ArrayIndexOutOfBoundsException e) {
-            throw new InsufficientBufferException(e);
+        } catch (Exception e) {
+            throw new BufferCompressionException(e);
         }
     }
-
-    private static void writeIntLE(int i, byte[] buf, int offset) {
-        buf[offset++] = (byte) i;
-        buf[offset++] = (byte) (i >>> 8);
-        buf[offset++] = (byte) (i >>> 16);
-        buf[offset] = (byte) (i >>> 24);
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockDecompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockDecompressor.java
index f60014d8fd0..fd73ddce12d 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockDecompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/compression/Lz4BlockDecompressor.java
@@ -21,12 +21,13 @@ package org.apache.flink.runtime.io.compression;
 import net.jpountz.lz4.LZ4Exception;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4FastDecompressor;
-import net.jpountz.util.SafeUtils;
 
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.readIntLE;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.validateLength;
 
 /**
  * Decode data written with {@link Lz4BlockCompressor}. It reads from and writes to byte arrays
@@ -44,7 +45,7 @@ public class Lz4BlockDecompressor implements BlockDecompressor {
 
     @Override
     public int decompress(ByteBuffer src, int srcOff, int srcLen, ByteBuffer dst, int dstOff)
-            throws DataCorruptionException {
+            throws BufferDecompressionException {
         final int prevSrcOff = src.position() + srcOff;
         final int prevDstOff = dst.position() + dstOff;
 
@@ -54,11 +55,12 @@ public class Lz4BlockDecompressor implements BlockDecompressor {
         validateLength(compressedLen, originalLen);
 
         if (dst.capacity() - prevDstOff < originalLen) {
-            throw new InsufficientBufferException("Buffer length too small");
+            throw new BufferDecompressionException("Buffer length too small");
         }
 
         if (src.limit() - prevSrcOff - HEADER_LENGTH < compressedLen) {
-            throw new DataCorruptionException("Source data is not integral for decompression.");
+            throw new BufferDecompressionException(
+                    "Source data is not integral for decompression.");
         }
 
         try {
@@ -66,13 +68,13 @@ public class Lz4BlockDecompressor implements BlockDecompressor {
                     decompressor.decompress(
                             src, prevSrcOff + HEADER_LENGTH, dst, prevDstOff, originalLen);
             if (compressedLen != compressedLen2) {
-                throw new DataCorruptionException(
+                throw new BufferDecompressionException(
                         "Input is corrupted, unexpected compressed length.");
             }
             src.position(prevSrcOff + compressedLen + HEADER_LENGTH);
             dst.position(prevDstOff + originalLen);
         } catch (LZ4Exception e) {
-            throw new DataCorruptionException("Input is corrupted", e);
+            throw new BufferDecompressionException("Input is corrupted", e);
         }
 
         return originalLen;
@@ -80,38 +82,30 @@ public class Lz4BlockDecompressor implements BlockDecompressor {
 
     @Override
     public int decompress(byte[] src, int srcOff, int srcLen, byte[] dst, int dstOff)
-            throws InsufficientBufferException, DataCorruptionException {
-        final int compressedLen = SafeUtils.readIntLE(src, srcOff);
-        final int originalLen = SafeUtils.readIntLE(src, srcOff + 4);
+            throws BufferDecompressionException {
+        final int compressedLen = readIntLE(src, srcOff);
+        final int originalLen = readIntLE(src, srcOff + 4);
         validateLength(compressedLen, originalLen);
 
         if (dst.length - dstOff < originalLen) {
-            throw new InsufficientBufferException("Buffer length too small");
+            throw new BufferDecompressionException("Buffer length too small");
         }
 
         if (src.length - srcOff - HEADER_LENGTH < compressedLen) {
-            throw new DataCorruptionException("Source data is not integral for decompression.");
+            throw new BufferDecompressionException(
+                    "Source data is not integral for decompression.");
         }
 
         try {
             final int compressedLen2 =
                     decompressor.decompress(src, srcOff + HEADER_LENGTH, dst, dstOff, originalLen);
             if (compressedLen != compressedLen2) {
-                throw new DataCorruptionException("Input is corrupted");
+                throw new BufferDecompressionException("Input is corrupted");
             }
         } catch (LZ4Exception e) {
-            throw new DataCorruptionException("Input is corrupted", e);
+            throw new BufferDecompressionException("Input is corrupted", e);
         }
 
         return originalLen;
     }
-
-    private void validateLength(int compressedLen, int originalLen) throws DataCorruptionException {
-        if (originalLen < 0
-                || compressedLen < 0
-                || (originalLen == 0 && compressedLen != 0)
-                || (originalLen != 0 && compressedLen == 0)) {
-            throw new DataCorruptionException("Input is corrupted, invalid length.");
-        }
-    }
 }
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
index 4a58ad2c08c..07b245f2b74 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferCompressor.java
@@ -36,16 +36,20 @@ public class BufferCompressor {
     /** The intermediate buffer for the compressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferCompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
         // the size of this intermediate heap buffer will be gotten from the
         // plugin configuration in the future, and currently, double size of
-        // the input buffer is enough for lz4-java compression library.
-        final byte[] heapBuffer = new byte[2 * bufferSize];
+        // the input buffer is enough for the compression libraries used.
+        this.internalBufferArray = new byte[2 * bufferSize];
         this.internalBuffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.wrap(heapBuffer), FreeingBufferRecycler.INSTANCE);
+                        MemorySegmentFactory.wrap(internalBufferArray),
+                        FreeingBufferRecycler.INSTANCE);
         this.blockCompressor =
                 BlockCompressionFactory.createBlockCompressionFactory(factoryName).getCompressor();
     }
@@ -86,7 +90,7 @@ public class BufferCompressor {
         // copy the compressed data back
         int memorySegmentOffset = buffer.getMemorySegmentOffset();
         MemorySegment segment = buffer.getMemorySegment();
-        segment.put(memorySegmentOffset, internalBuffer.array(), 0, compressedLen);
+        segment.put(memorySegmentOffset, internalBufferArray, 0, compressedLen);
 
         return new ReadOnlySlicedNetworkBuffer(
                 buffer.asByteBuf(), 0, compressedLen, memorySegmentOffset, true);
@@ -107,15 +111,34 @@ public class BufferCompressor {
                 "Illegal reference count, buffer need to be released.");
 
         try {
+            int compressedLen;
             int length = buffer.getSize();
-            // compress the given buffer into the internal heap buffer
-            int compressedLen =
-                    blockCompressor.compress(
-                            buffer.getNioBuffer(0, length),
-                            0,
-                            length,
-                            internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                            0);
+            MemorySegment memorySegment = buffer.getMemorySegment();
+            // If buffer is on-heap, manipulate the underlying array directly. There are two main
+            // reasons why NIO buffer is not directly used here: One is that some compression
+            // libraries will use the underlying array for heap buffer, but our input buffer may be
+            // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
+            // is that for the on-heap buffer, directly operating the underlying array can reduce
+            // additional overhead compared to generating a NIO buffer.
+            if (!memorySegment.isOffHeap()) {
+                compressedLen =
+                        blockCompressor.compress(
+                                memorySegment.getArray(),
+                                buffer.getMemorySegmentOffset(),
+                                length,
+                                internalBufferArray,
+                                0);
+            } else {
+                // compress the given buffer into the internal heap buffer
+                compressedLen =
+                        blockCompressor.compress(
+                                buffer.getNioBuffer(0, length),
+                                0,
+                                length,
+                                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
+                                0);
+            }
+
             return compressedLen < length ? compressedLen : 0;
         } catch (Throwable throwable) {
             // return the original buffer if failed to compress
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
index c6dc89a407a..b99f5d05413 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferDecompressor.java
@@ -37,15 +37,19 @@ public class BufferDecompressor {
     /** The intermediate buffer for the decompressed data. */
     private final NetworkBuffer internalBuffer;
 
+    /** The backup array of intermediate buffer. */
+    private final byte[] internalBufferArray;
+
     public BufferDecompressor(int bufferSize, String factoryName) {
         checkArgument(bufferSize > 0);
         checkNotNull(factoryName);
 
         // the decompressed data size should be never larger than the configured buffer size
-        final byte[] heapBuffer = new byte[bufferSize];
+        this.internalBufferArray = new byte[bufferSize];
         this.internalBuffer =
                 new NetworkBuffer(
-                        MemorySegmentFactory.wrap(heapBuffer), FreeingBufferRecycler.INSTANCE);
+                        MemorySegmentFactory.wrap(internalBufferArray),
+                        FreeingBufferRecycler.INSTANCE);
         this.blockDecompressor =
                 BlockCompressionFactory.createBlockCompressionFactory(factoryName)
                         .getDecompressor();
@@ -82,7 +86,7 @@ public class BufferDecompressor {
         // copy the decompressed data back
         int memorySegmentOffset = buffer.getMemorySegmentOffset();
         MemorySegment segment = buffer.getMemorySegment();
-        segment.put(memorySegmentOffset, internalBuffer.array(), 0, decompressedLen);
+        segment.put(memorySegmentOffset, internalBufferArray, 0, decompressedLen);
 
         return new ReadOnlySlicedNetworkBuffer(
                 buffer.asByteBuf(), 0, decompressedLen, memorySegmentOffset, false);
@@ -103,12 +107,28 @@ public class BufferDecompressor {
                 "Illegal reference count, buffer need to be released.");
 
         int length = buffer.getSize();
-        // decompress the given buffer into the internal heap buffer
-        return blockDecompressor.decompress(
-                buffer.getNioBuffer(0, length),
-                0,
-                length,
-                internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
-                0);
+        MemorySegment memorySegment = buffer.getMemorySegment();
+        // If buffer is on-heap, manipulate the underlying array directly. There are two main
+        // reasons why NIO buffer is not directly used here: One is that some compression
+        // libraries will use the underlying array for heap buffer, but our input buffer may be
+        // a read-only ByteBuffer, and it is illegal to access internal array. Another reason
+        // is that for the on-heap buffer, directly operating the underlying array can reduce
+        // additional overhead compared to generating a NIO buffer.
+        if (!memorySegment.isOffHeap()) {
+            return blockDecompressor.decompress(
+                    memorySegment.getArray(),
+                    buffer.getMemorySegmentOffset(),
+                    length,
+                    internalBufferArray,
+                    0);
+        } else {
+            // decompress the given buffer into the internal heap buffer
+            return blockDecompressor.decompress(
+                    buffer.getNioBuffer(0, length),
+                    0,
+                    length,
+                    internalBuffer.getNioBuffer(0, internalBuffer.capacity()),
+                    0);
+        }
     }
 }
diff --git a/flink-runtime/src/main/resources/META-INF/NOTICE b/flink-runtime/src/main/resources/META-INF/NOTICE
new file mode 100644
index 00000000000..80ac2a27de1
--- /dev/null
+++ b/flink-runtime/src/main/resources/META-INF/NOTICE
@@ -0,0 +1,9 @@
+flink-runtime
+Copyright 2014-2021 The Apache Software Foundation
+
+This product includes software developed at
+The Apache Software Foundation (http://www.apache.org/).
+
+This project bundles the following dependencies under the Apache Software License 2.0. (http://www.apache.org/licenses/LICENSE-2.0.txt)
+
+- io.airlift:aircompressor:0.21
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java
index fd8a05db6f9..efa59aad27f 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/compression/BlockCompressionTest.java
@@ -18,20 +18,28 @@
 
 package org.apache.flink.runtime.io.compression;
 
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.MethodSource;
 
 import java.nio.ByteBuffer;
+import java.util.stream.Stream;
 
-import static org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory.HEADER_LENGTH;
+import static org.apache.flink.runtime.io.compression.CompressorUtils.HEADER_LENGTH;
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** Tests for block compression. */
 class BlockCompressionTest {
+    private static Stream<BlockCompressionFactory> compressCodecGenerator() {
+        return Stream.of(
+                BlockCompressionFactory.createBlockCompressionFactory("LZ4"),
+                BlockCompressionFactory.createBlockCompressionFactory("LZO"),
+                BlockCompressionFactory.createBlockCompressionFactory("ZSTD"));
+    }
 
-    @Test
-    void testLz4() {
-        BlockCompressionFactory factory = new Lz4BlockCompressionFactory();
+    @ParameterizedTest
+    @MethodSource("compressCodecGenerator")
+    void testBlockCompression(BlockCompressionFactory factory) {
         runArrayTest(factory, 32768);
         runArrayTest(factory, 16);
 
@@ -63,7 +71,7 @@ class BlockCompressionTest {
                                         originalLen,
                                         insufficientCompressArray,
                                         compressedOff))
-                .isInstanceOf(InsufficientBufferException.class);
+                .isInstanceOf(BufferCompressionException.class);
 
         // 2. test normal compress
         byte[] compressedData =
@@ -83,7 +91,7 @@ class BlockCompressionTest {
                                         compressedLen,
                                         insufficientDecompressArray,
                                         decompressedOff))
-                .isInstanceOf(InsufficientBufferException.class);
+                .isInstanceOf(BufferDecompressionException.class);
 
         // 4. test normal decompress
         byte[] decompressedData = new byte[decompressedOff + originalLen];
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
index c7dbe04ff5e..d269151205b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/BufferCompressionTest.java
@@ -68,6 +68,18 @@ public class BufferCompressionTest {
                     {false, "LZ4", true, false},
                     {false, "LZ4", false, true},
                     {false, "LZ4", false, false},
+                    {true, "ZSTD", true, false},
+                    {true, "ZSTD", false, true},
+                    {true, "ZSTD", false, false},
+                    {false, "ZSTD", true, false},
+                    {false, "ZSTD", false, true},
+                    {false, "ZSTD", false, false},
+                    {true, "LZO", true, false},
+                    {true, "LZO", false, true},
+                    {true, "LZO", false, false},
+                    {false, "LZO", true, false},
+                    {false, "LZO", false, true},
+                    {false, "LZO", false, false}
                 });
     }
 
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 8ad892b1d7a..1ea783d3e34 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -58,6 +58,8 @@ import org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors;
 
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 
@@ -197,10 +199,10 @@ class CreditBasedPartitionRequestClientHandlerTest {
     /**
      * Verifies that {@link BufferResponse} of compressed {@link Buffer} can be handled correctly.
      */
-    @Test
-    void testReceiveCompressedBuffer() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
+    void testReceiveCompressedBuffer(final String compressionCodec) throws Exception {
         int bufferSize = 1024;
-        String compressionCodec = "LZ4";
         BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
         BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
         NetworkBufferPool networkBufferPool = new NetworkBufferPool(10, bufferSize);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
index 925a1444c32..b6927e5c1ec 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NettyMessageClientSideSerializationTest.java
@@ -38,6 +38,8 @@ import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.api.extension.ExtendWith;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.util.Random;
@@ -63,12 +65,10 @@ class NettyMessageClientSideSerializationTest {
 
     private static final int BUFFER_SIZE = 1024;
 
-    private static final BufferCompressor COMPRESSOR = new BufferCompressor(BUFFER_SIZE, "LZ4");
-
-    private static final BufferDecompressor DECOMPRESSOR =
-            new BufferDecompressor(BUFFER_SIZE, "LZ4");
-
     private final Random random = new Random();
+    private static BufferCompressor compressor;
+
+    private static BufferDecompressor decompressor;
 
     private EmbeddedChannel channel;
 
@@ -143,8 +143,11 @@ class NettyMessageClientSideSerializationTest {
         testBufferResponse(true, false);
     }
 
-    @Test
-    void testCompressedBufferResponse() {
+    @ParameterizedTest
+    @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
+    void testCompressedBufferResponse(final String codecFactoryName) {
+        compressor = new BufferCompressor(BUFFER_SIZE, codecFactoryName);
+        decompressor = new BufferDecompressor(BUFFER_SIZE, codecFactoryName);
         testBufferResponse(false, true);
     }
 
@@ -178,7 +181,7 @@ class NettyMessageClientSideSerializationTest {
         if (testReadOnlyBuffer) {
             testBuffer = buffer.readOnlySlice();
         } else if (testCompressedBuffer) {
-            testBuffer = COMPRESSOR.compressToOriginalBuffer(buffer);
+            testBuffer = compressor.compressToOriginalBuffer(buffer);
         }
 
         BufferResponse expected =
@@ -221,6 +224,6 @@ class NettyMessageClientSideSerializationTest {
         Buffer compressedBuffer = new NetworkBuffer(segment, FreeingBufferRecycler.INSTANCE);
         buffer.asByteBuf().readBytes(compressedBuffer.asByteBuf(), buffer.readableBytes());
         compressedBuffer.setCompressed(true);
-        return DECOMPRESSOR.decompressToOriginalBuffer(compressedBuffer);
+        return decompressor.decompressToOriginalBuffer(compressedBuffer);
     }
 }
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 44249a57bfa..c5757cba4b2 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -73,6 +73,8 @@ import org.apache.flink.util.CompressedSerializedValue;
 import org.apache.flink.shaded.guava30.com.google.common.io.Closer;
 
 import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
@@ -326,10 +328,10 @@ public class SingleInputGateTest extends InputGateTestBase {
      * Tests that the compressed buffer will be decompressed after calling {@link
      * SingleInputGate#getNext()}.
      */
-    @Test
-    void testGetCompressedBuffer() throws Exception {
+    @ParameterizedTest
+    @ValueSource(strings = {"LZ4", "LZO", "ZSTD"})
+    void testGetCompressedBuffer(final String compressionCodec) throws Exception {
         int bufferSize = 1024;
-        String compressionCodec = "LZ4";
         BufferCompressor compressor = new BufferCompressor(bufferSize, compressionCodec);
         BufferDecompressor decompressor = new BufferDecompressor(bufferSize, compressionCodec);
 
diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
index 916f560d33e..bb1ba9beba9 100644
--- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
+++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/io/CompressedHeaderlessChannelTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.runtime.io;
 
 import org.apache.flink.runtime.io.compression.BlockCompressionFactory;
-import org.apache.flink.runtime.io.compression.Lz4BlockCompressionFactory;
 import org.apache.flink.runtime.io.disk.iomanager.BufferFileWriter;
 import org.apache.flink.runtime.io.disk.iomanager.FileIOChannel;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
@@ -27,6 +26,8 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
 
 import org.junit.After;
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.io.IOException;
 import java.util.Random;
@@ -37,12 +38,22 @@ import static org.assertj.core.api.Assertions.assertThat;
  * Tests for {@link CompressedHeaderlessChannelReaderInputView} and {@link
  * CompressedHeaderlessChannelWriterOutputView}.
  */
+@RunWith(Parameterized.class)
 public class CompressedHeaderlessChannelTest {
     private static final int BUFFER_SIZE = 256;
 
     private IOManager ioManager;
 
-    private BlockCompressionFactory compressionFactory = new Lz4BlockCompressionFactory();
+    @Parameterized.Parameter public static BlockCompressionFactory compressionFactory;
+
+    @Parameterized.Parameters(name = "compressionFactory = {0}")
+    public static BlockCompressionFactory[] compressionFactory() {
+        return new BlockCompressionFactory[] {
+            BlockCompressionFactory.createBlockCompressionFactory("LZ4"),
+            BlockCompressionFactory.createBlockCompressionFactory("LZO"),
+            BlockCompressionFactory.createBlockCompressionFactory("ZSTD")
+        };
+    }
 
     public CompressedHeaderlessChannelTest() {
         ioManager = new IOManagerAsync();