You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@iotdb.apache.org by xi...@apache.org on 2023/04/17 09:36:03 UTC

[iotdb] branch rel/1.1 updated: [IOTDB-2569]Support ZSTD Compression (#9231) (#9630)

This is an automated email from the ASF dual-hosted git repository.

xingtanzjr pushed a commit to branch rel/1.1
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/rel/1.1 by this push:
     new e88103fe19 [IOTDB-2569]Support ZSTD Compression (#9231) (#9630)
e88103fe19 is described below

commit e88103fe19033628855de09113619df19cc2e8e7
Author: shuwenwei <55...@users.noreply.github.com>
AuthorDate: Mon Apr 17 17:35:56 2023 +0800

    [IOTDB-2569]Support ZSTD Compression (#9231) (#9630)
---
 LICENSE-binary                                     |  5 ++
 client-cpp/src/main/Session.h                      |  3 +-
 client-py/iotdb/utils/IoTDBConstants.py            |  1 +
 docs/UserGuide/Data-Concept/Compression.md         |  2 +
 docs/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md |  2 +-
 docs/UserGuide/Reference/Common-Config-Manual.md   | 12 +--
 docs/zh/UserGuide/Data-Concept/Compression.md      |  1 +
 .../UserGuide/Ecosystem-Integration/NiFi-IoTDB.md  |  2 +-
 .../zh/UserGuide/Reference/Common-Config-Manual.md | 12 +--
 .../apache/iotdb/jdbc/IoTDBDatabaseMetadata.java   |  1 +
 .../resources/conf/iotdb-common.properties         |  2 +-
 pom.xml                                            |  6 ++
 tsfile/pom.xml                                     |  4 +
 .../iotdb/tsfile/common/conf/TSFileConfig.java     |  2 +-
 .../apache/iotdb/tsfile/compress/ICompressor.java  | 61 ++++++++++++++-
 .../iotdb/tsfile/compress/IUnCompressor.java       | 44 +++++++++++
 .../file/metadata/enums/CompressionType.java       |  6 +-
 .../apache/iotdb/tsfile/compress/CompressTest.java | 39 ++++++++++
 .../org/apache/iotdb/tsfile/compress/ZstdTest.java | 88 ++++++++++++++++++++++
 19 files changed, 274 insertions(+), 19 deletions(-)

diff --git a/LICENSE-binary b/LICENSE-binary
index 98454894e8..5e7a032cf6 100644
--- a/LICENSE-binary
+++ b/LICENSE-binary
@@ -279,6 +279,11 @@ org.ow2.asm:asm:5.0.4
 org.jline:jline:3.21.0
 
 
+BSD 2-Clause
+------------
+com.github.luben:zstd-jni:1.5.4-2
+
+
 MIT License
 ------------
 org.slf4j:slf4j-api
diff --git a/client-cpp/src/main/Session.h b/client-cpp/src/main/Session.h
index 7d25298228..c852da40f5 100644
--- a/client-cpp/src/main/Session.h
+++ b/client-cpp/src/main/Session.h
@@ -140,7 +140,8 @@ namespace CompressionType {
         SDT = (char) 4,
         PAA = (char) 5,
         PLA = (char) 6,
-        LZ4 = (char) 7
+        LZ4 = (char) 7,
+        ZSTD = (char) 8
     };
 }
 
diff --git a/client-py/iotdb/utils/IoTDBConstants.py b/client-py/iotdb/utils/IoTDBConstants.py
index f902ac6d58..2a03a9b923 100644
--- a/client-py/iotdb/utils/IoTDBConstants.py
+++ b/client-py/iotdb/utils/IoTDBConstants.py
@@ -82,6 +82,7 @@ class Compressor(Enum):
     PAA = 5
     PLA = 6
     LZ4 = 7
+    ZSTD = 8
 
     # this method is implemented to avoid the issue reported by:
     # https://bugs.python.org/issue30545
diff --git a/docs/UserGuide/Data-Concept/Compression.md b/docs/UserGuide/Data-Concept/Compression.md
index b635348871..073d1a5365 100644
--- a/docs/UserGuide/Data-Concept/Compression.md
+++ b/docs/UserGuide/Data-Concept/Compression.md
@@ -35,6 +35,8 @@ IoTDB allows you to specify the compression method of the column when creating a
 
 * GZIP
 
+* ZSTD
+
 The specified syntax for compression is detailed in [Create Timeseries Statement](../Reference/SQL-Reference.md).
 
 ## Compression Ratio Statistics
diff --git a/docs/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md b/docs/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
index 7f5dd94ec7..86e3a064ac 100644
--- a/docs/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
+++ b/docs/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
@@ -106,7 +106,7 @@ The structure of property `Schema`:
 4. The property `Prefix` will be added to tsName as the field name when add data to IoTDB.
 5. The supported `dataTypes` are `INT32`, `INT64`, `FLOAT`, `DOUBLE`, `BOOLEAN`, `TEXT`.
 6. The supported `encoding` are `PLAIN`, `DICTIONARY`, `RLE`, `DIFF`, `TS_2DIFF`, `BITMAP`, `GORILLA_V1`, `REGULAR`, `GORILLA`.
-7. The supported `compressionType` are `UNCOMPRESSED`, `SNAPPY`, `GZIP`, `LZO`, `SDT`, `PAA`, `PLA`, `LZ4`.
+7. The supported `compressionType` are `UNCOMPRESSED`, `SNAPPY`, `GZIP`, `LZO`, `SDT`, `PAA`, `PLA`, `LZ4`, `ZSTD`.
 
 ## Relationships
 
diff --git a/docs/UserGuide/Reference/Common-Config-Manual.md b/docs/UserGuide/Reference/Common-Config-Manual.md
index 095213a237..9b0ad80327 100644
--- a/docs/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/UserGuide/Reference/Common-Config-Manual.md
@@ -1221,12 +1221,12 @@ Different configuration parameters take effect in the following three ways:
 
 * compressor
 
-|    Name     | compressor                                    |
-| :---------: | :-------------------------------------------- |
-| Description | Data compression method                       |
-|    Type     | Enum String : “UNCOMPRESSED”, “SNAPPY”, "LZ4" |
-|   Default   | SNAPPY                                        |
-|  Effective  | hot-load                                       |
+|    Name     | compressor                                            |
+|:-----------:|:------------------------------------------------------|
+| Description | Data compression method                               |
+|    Type     | Enum String : “UNCOMPRESSED”, “SNAPPY”, "LZ4", "ZSTD" |
+|   Default   | SNAPPY                                                |
+|  Effective  | hot-load                                              |
 
 * bloomFilterErrorRate
 
diff --git a/docs/zh/UserGuide/Data-Concept/Compression.md b/docs/zh/UserGuide/Data-Concept/Compression.md
index cfd1251dbf..8dc472adc3 100644
--- a/docs/zh/UserGuide/Data-Concept/Compression.md
+++ b/docs/zh/UserGuide/Data-Concept/Compression.md
@@ -31,6 +31,7 @@ IoTDB 允许在创建一个时间序列的时候指定该列的压缩方式。
 * SNAPPY 压缩
 * LZ4 压缩
 * GZIP 压缩
+* ZSTD 压缩
 
 压缩方式的指定语法详见本文 [SQL 参考文档](../Reference/SQL-Reference.md)。
 
diff --git a/docs/zh/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md b/docs/zh/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
index f51d2aba94..b2d5da1caf 100644
--- a/docs/zh/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
+++ b/docs/zh/UserGuide/Ecosystem-Integration/NiFi-IoTDB.md
@@ -106,7 +106,7 @@ Apache NiFi 包含以下功能:
 4. 当数据插入IoTDB时,Prefix属性会被添加到 tsName以作为插入的字段名。
 5. 支持的 `dataTypes` 有:`INT32`, `INT64`, `FLOAT`, `DOUBLE`, `BOOLEAN`, `TEXT`。
 6. 支持的 `encoding` 有: `PLAIN`, `DICTIONARY`, `RLE`, `DIFF`, `TS_2DIFF`, `BITMAP`, `GORILLA_V1`, `REGULAR`, `GORILLA`。
-7. 支持的 `compressionType` 有: `UNCOMPRESSED`, `SNAPPY`, `GZIP`, `LZO`, `SDT`, `PAA`, `PLA`, `LZ4`。
+7. 支持的 `compressionType` 有: `UNCOMPRESSED`, `SNAPPY`, `GZIP`, `LZO`, `SDT`, `PAA`, `PLA`, `LZ4`, `ZSTD`。
 
 ### Relationships
 
diff --git a/docs/zh/UserGuide/Reference/Common-Config-Manual.md b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
index 98efd6cc05..c7b7315b24 100644
--- a/docs/zh/UserGuide/Reference/Common-Config-Manual.md
+++ b/docs/zh/UserGuide/Reference/Common-Config-Manual.md
@@ -1260,12 +1260,12 @@ IoTDB ConfigNode 和 DataNode 的公共配置参数位于 `conf` 目录下。
 
 * compressor
 
-|     名字     | compressor                                    |
-| :----------: | :-------------------------------------------- |
-|     描述     | 数据压缩方法                                  |
-|     类型     | 枚举 String : “UNCOMPRESSED”, “SNAPPY”, “LZ4” |
-|    默认值    | SNAPPY                                        |
-| 改后生效方式 | 热加载                                      |
+|   名字   | compressor                                          |
+|:------:|:----------------------------------------------------|
+|   描述   | 数据压缩方法                                              |
+|   类型   | 枚举 String : “UNCOMPRESSED”, “SNAPPY”, “LZ4”, “ZSTD” |
+|  默认值   | SNAPPY                                              |
+| 改后生效方式 | 热加载                                                 |
 
 * max\_degree\_of\_index\_node
 
diff --git a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
index 5d1b56f211..3f2acc8a2b 100644
--- a/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
+++ b/jdbc/src/main/java/org/apache/iotdb/jdbc/IoTDBDatabaseMetadata.java
@@ -143,6 +143,7 @@ public class IoTDBDatabaseMetadata implements DatabaseMetaData {
       "LAST",
       "LZO",
       "LZ4",
+      "ZSTD",
       "LATEST",
       "LIKE",
       "METADATA",
diff --git a/node-commons/src/assembly/resources/conf/iotdb-common.properties b/node-commons/src/assembly/resources/conf/iotdb-common.properties
index ba9650da7a..99a7101bb3 100644
--- a/node-commons/src/assembly/resources/conf/iotdb-common.properties
+++ b/node-commons/src/assembly/resources/conf/iotdb-common.properties
@@ -765,7 +765,7 @@ cluster_name=defaultCluster
 # value_encoder=PLAIN
 
 # Compression configuration
-# Data compression method, supports UNCOMPRESSED, SNAPPY or LZ4. Default value is SNAPPY
+# Data compression method, supports UNCOMPRESSED, SNAPPY, ZSTD or LZ4. Default value is SNAPPY
 # compressor=SNAPPY
 
 # Maximum degree of a metadataIndex node, default value is 256
diff --git a/pom.xml b/pom.xml
index 0ff39c7a89..54bca94aee 100644
--- a/pom.xml
+++ b/pom.xml
@@ -165,6 +165,7 @@
         <javax.xml.bind.version>2.4.0-b180830.0359</javax.xml.bind.version>
         <felix.version>5.1.8</felix.version>
         <snappy.version>1.1.8.4</snappy.version>
+        <zstd-jni.version>1.5.4-2</zstd-jni.version>
         <netty.version>4.1.82.Final</netty.version>
         <!-- URL of the ASF SonarQube server -->
         <sonar.host.url>https://sonarcloud.io</sonar.host.url>
@@ -516,6 +517,11 @@
                 <artifactId>snappy-java</artifactId>
                 <version>${snappy.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.luben</groupId>
+                <artifactId>zstd-jni</artifactId>
+                <version>${zstd-jni.version}</version>
+            </dependency>
             <dependency>
                 <groupId>org.apache.thrift</groupId>
                 <artifactId>libthrift</artifactId>
diff --git a/tsfile/pom.xml b/tsfile/pom.xml
index 93c4a25e3a..9d1b709326 100644
--- a/tsfile/pom.xml
+++ b/tsfile/pom.xml
@@ -37,6 +37,10 @@
         <tsfile.ut.skip>${tsfile.test.skip}</tsfile.ut.skip>
     </properties>
     <dependencies>
+        <dependency>
+            <groupId>com.github.luben</groupId>
+            <artifactId>zstd-jni</artifactId>
+        </dependency>
         <dependency>
             <groupId>ch.qos.logback</groupId>
             <artifactId>logback-classic</artifactId>
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
index a1a5b22f88..861c702ae1 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/common/conf/TSFileConfig.java
@@ -112,7 +112,7 @@ public class TSFileConfig implements Serializable {
   private double freqEncodingSNR = 40;
   /** Default block size for FREQ encoding is 1024. */
   private int freqEncodingBlockSize = 1024;
-  /** Data compression method, TsFile supports UNCOMPRESSED, SNAPPY or LZ4. */
+  /** Data compression method, TsFile supports UNCOMPRESSED, SNAPPY, ZSTD or LZ4. */
   private CompressionType compressor = CompressionType.SNAPPY;
   /** Line count threshold for checking page memory occupied size. */
   private int pageCheckSizeThreshold = 100;
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
index 9243e53417..84654e2a00 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/ICompressor.java
@@ -23,6 +23,7 @@ import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedExc
 import org.apache.iotdb.tsfile.exception.compress.GZIPCompressOverflowException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
+import com.github.luben.zstd.Zstd;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import org.xerial.snappy.Snappy;
@@ -38,6 +39,7 @@ import java.util.zip.GZIPOutputStream;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.GZIP;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.LZ4;
 import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.SNAPPY;
+import static org.apache.iotdb.tsfile.file.metadata.enums.CompressionType.ZSTD;
 
 /** compress data according to type in schema. */
 public interface ICompressor extends Serializable {
@@ -65,6 +67,8 @@ public interface ICompressor extends Serializable {
         return new IOTDBLZ4Compressor();
       case GZIP:
         return new GZIPCompressor();
+      case ZSTD:
+        return new ZstdCompressor();
       default:
         throw new CompressionTypeNotSupportedException(name.toString());
     }
@@ -216,8 +220,9 @@ public interface ICompressor extends Serializable {
 
     @Override
     public int compress(ByteBuffer data, ByteBuffer compressed) {
+      int startPosition = compressed.position();
       compressor.compress(data, compressed);
-      return data.limit();
+      return compressed.position() - startPosition;
     }
 
     @Override
@@ -311,4 +316,58 @@ public interface ICompressor extends Serializable {
       return GZIP;
     }
   }
+
+  class ZstdCompressor implements ICompressor {
+
+    private int compressionLevel;
+
+    public ZstdCompressor() {
+      super();
+      compressionLevel = Zstd.maxCompressionLevel();
+    }
+
+    @Override
+    public byte[] compress(byte[] data) throws IOException {
+      return Zstd.compress(data, compressionLevel);
+    }
+
+    @Override
+    public byte[] compress(byte[] data, int offset, int length) throws IOException {
+      if (data == null) {
+        return new byte[0];
+      }
+      byte[] compressedData = new byte[getMaxBytesForCompression(length)];
+      int compressedSize = compress(data, offset, length, compressedData);
+      byte[] result = new byte[compressedSize];
+      System.arraycopy(compressedData, 0, result, 0, compressedSize);
+      return result;
+    }
+
+    @Override
+    public int compress(byte[] data, int offset, int length, byte[] compressed) throws IOException {
+      return (int)
+          Zstd.compressByteArray(
+              compressed, 0, compressed.length, data, offset, length, compressionLevel);
+    }
+
+    /**
+     * @param data MUST be DirectByteBuffer for Zstd.
+     * @param compressed MUST be DirectByteBuffer for Zstd.
+     * @return byte length of compressed data.
+     */
+    @Override
+    public int compress(ByteBuffer data, ByteBuffer compressed) throws IOException {
+      return Zstd.compress(compressed, data, compressionLevel);
+    }
+
+    @Override
+    public int getMaxBytesForCompression(int uncompressedDataSize) {
+      return (int) Zstd.compressBound(uncompressedDataSize);
+    }
+
+    @Override
+    public CompressionType getType() {
+      return ZSTD;
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
index b4b333e949..d12985d759 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/compress/IUnCompressor.java
@@ -22,6 +22,7 @@ package org.apache.iotdb.tsfile.compress;
 import org.apache.iotdb.tsfile.exception.compress.CompressionTypeNotSupportedException;
 import org.apache.iotdb.tsfile.file.metadata.enums.CompressionType;
 
+import com.github.luben.zstd.Zstd;
 import net.jpountz.lz4.LZ4Compressor;
 import net.jpountz.lz4.LZ4Factory;
 import net.jpountz.lz4.LZ4SafeDecompressor;
@@ -54,6 +55,8 @@ public interface IUnCompressor {
         return new LZ4UnCompressor();
       case GZIP:
         return new GZIPUnCompressor();
+      case ZSTD:
+        return new ZstdUnCompressor();
       default:
         throw new CompressionTypeNotSupportedException(name.toString());
     }
@@ -312,4 +315,45 @@ public interface IUnCompressor {
       return CompressionType.GZIP;
     }
   }
+
+  class ZstdUnCompressor implements IUnCompressor {
+
+    @Override
+    public int getUncompressedLength(byte[] array, int offset, int length) throws IOException {
+      return (int) Zstd.decompressedSize(array, offset, length);
+    }
+
+    @Override
+    public int getUncompressedLength(ByteBuffer buffer) throws IOException {
+      return (int) Zstd.decompressedSize(buffer);
+    }
+
+    @Override
+    public byte[] uncompress(byte[] byteArray) throws IOException {
+      return Zstd.decompress(byteArray, getUncompressedLength(byteArray, 0, byteArray.length));
+    }
+
+    @Override
+    public int uncompress(byte[] byteArray, int offset, int length, byte[] output, int outOffset)
+        throws IOException {
+      return (int)
+          Zstd.decompressByteArray(
+              output, outOffset, output.length, byteArray, offset, byteArray.length);
+    }
+
+    /**
+     * @param compressed MUST be DirectByteBuffer for Zstd.
+     * @param uncompressed MUST be DirectByteBuffer for Zstd.
+     * @return byte length of compressed data.
+     */
+    @Override
+    public int uncompress(ByteBuffer compressed, ByteBuffer uncompressed) throws IOException {
+      return Zstd.decompress(uncompressed, compressed);
+    }
+
+    @Override
+    public CompressionType getCodecName() {
+      return CompressionType.ZSTD;
+    }
+  }
 }
diff --git a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
index 1187d0376d..c78a84508a 100644
--- a/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
+++ b/tsfile/src/main/java/org/apache/iotdb/tsfile/file/metadata/enums/CompressionType.java
@@ -30,7 +30,9 @@ public enum CompressionType {
 
   /** LZ4 */
   // NOTICE: To ensure the compatibility of existing files, do not change the byte LZ4 binds to.
-  LZ4(".lz4", (byte) 7);
+  LZ4(".lz4", (byte) 7),
+  /** ZSTD */
+  ZSTD(".zstd", (byte) 8);
 
   private final String extensionName;
   private final byte index;
@@ -56,6 +58,8 @@ public enum CompressionType {
         return CompressionType.GZIP;
       case 7:
         return CompressionType.LZ4;
+      case 8:
+        return CompressionType.ZSTD;
       default:
         throw new IllegalArgumentException("Invalid input: " + compressor);
     }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/CompressTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/CompressTest.java
index d3d0406d41..c1339bc38d 100644
--- a/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/CompressTest.java
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/CompressTest.java
@@ -19,6 +19,7 @@
 package org.apache.iotdb.tsfile.compress;
 
 import org.apache.iotdb.tsfile.utils.PublicBAOS;
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
 
 import org.junit.After;
 import org.junit.Before;
@@ -26,6 +27,7 @@ import org.junit.Test;
 import org.xerial.snappy.Snappy;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.nio.charset.StandardCharsets;
 import java.util.Arrays;
 
@@ -103,4 +105,41 @@ public class CompressTest {
     String result = new String(uncompressed, StandardCharsets.UTF_8);
     assertEquals(inputString, result);
   }
+
+  @Test
+  public void zstdCompressorTest1() throws IOException {
+    PublicBAOS out = new PublicBAOS();
+    out.write(inputString.getBytes(StandardCharsets.UTF_8));
+    ICompressor compressor = new ICompressor.ZstdCompressor();
+    IUnCompressor unCompressor = new IUnCompressor.ZstdUnCompressor();
+    byte[] compressed = compressor.compress(out.getBuf());
+    byte[] uncompressed = new byte[out.size()];
+    unCompressor.uncompress(compressed, 0, compressed.length, uncompressed, 0);
+    String result = new String(uncompressed, StandardCharsets.UTF_8);
+    assertEquals(inputString, result);
+  }
+
+  @Test
+  public void zstdCompressorTest2() throws IOException {
+    byte[] input = inputString.getBytes();
+    ICompressor compressor = new ICompressor.ZstdCompressor();
+    IUnCompressor unCompressor = new IUnCompressor.ZstdUnCompressor();
+    ByteBuffer data = ByteBuffer.allocateDirect(input.length);
+    data.put(input);
+    data.position(0);
+    ByteBuffer compressed =
+        ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(input.length));
+    int compressedSize = compressor.compress(data, compressed);
+    byte[] compressedData = new byte[compressedSize];
+    compressed.position(0);
+    ByteBuffer compressedDataBuffer = ByteBuffer.allocateDirect(compressedSize);
+    compressed.get(compressedData, 0, compressedSize);
+    compressedDataBuffer.put(compressedData);
+    compressedDataBuffer.position(0);
+
+    ByteBuffer uncompressed = ByteBuffer.allocateDirect(input.length);
+    int uncompressedSize = unCompressor.uncompress(compressedDataBuffer, uncompressed);
+    uncompressed.position(0);
+    assertEquals(inputString, ReadWriteIOUtils.readStringFromDirectByteBuffer(uncompressed));
+  }
 }
diff --git a/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/ZstdTest.java b/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/ZstdTest.java
new file mode 100644
index 0000000000..348d1bd664
--- /dev/null
+++ b/tsfile/src/test/java/org/apache/iotdb/tsfile/compress/ZstdTest.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.tsfile.compress;
+
+import org.apache.iotdb.tsfile.utils.ReadWriteIOUtils;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.concurrent.ThreadLocalRandom;
+
+public class ZstdTest {
+
+  private String randomString(int length) {
+    StringBuilder builder = new StringBuilder(length);
+    for (int i = 0; i < length; i++) {
+      builder.append((char) (ThreadLocalRandom.current().nextInt(33, 128)));
+    }
+    return builder.toString();
+  }
+
+  @Before
+  public void setUp() {}
+
+  @After
+  public void tearDown() {}
+
+  @Test
+  public void testBytes1() throws IOException {
+    for (int i = 1; i < 500000; i += 100000) {
+      String input = randomString(i);
+      ByteBuffer source = ByteBuffer.allocateDirect(input.getBytes().length);
+      source.put(input.getBytes());
+      source.flip();
+
+      ICompressor compressor = new ICompressor.ZstdCompressor();
+      ByteBuffer compressed =
+          ByteBuffer.allocateDirect(compressor.getMaxBytesForCompression(input.getBytes().length));
+      compressor.compress(source, compressed);
+
+      IUnCompressor unCompressor = new IUnCompressor.ZstdUnCompressor();
+      ByteBuffer uncompressedByteBuffer = ByteBuffer.allocateDirect(input.getBytes().length);
+      compressed.flip();
+      unCompressor.uncompress(compressed, uncompressedByteBuffer);
+
+      uncompressedByteBuffer.flip();
+      String afterDecode = ReadWriteIOUtils.readStringFromDirectByteBuffer(uncompressedByteBuffer);
+      Assert.assertEquals(afterDecode, input);
+    }
+  }
+
+  @Test
+  public void testBytes2() throws IOException {
+    ICompressor compressor = new ICompressor.ZstdCompressor();
+    IUnCompressor unCompressor = new IUnCompressor.ZstdUnCompressor();
+
+    int n = 500000;
+    String input = randomString(n);
+    byte[] uncom = input.getBytes(StandardCharsets.UTF_8);
+    byte[] compressed = compressor.compress(uncom, 0, uncom.length);
+    // length should be same
+    Assert.assertEquals(compressor.compress(uncom).length, compressed.length);
+    byte[] uncompressed = unCompressor.uncompress(compressed);
+    Assert.assertArrayEquals(uncom, uncompressed);
+  }
+}