You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/16 13:57:30 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][File] Support compress (#3899)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 55602f6b1 [Feature][Connector-V2][File] Support compress (#3899)
55602f6b1 is described below

commit 55602f6b1c6e7ae477e1952d69bccd468de6bc81
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Jan 16 21:57:21 2023 +0800

    [Feature][Connector-V2][File] Support compress (#3899)
    
    * [Feature][Connector-V2][File] Support compress
    
    * [Feature][Connector-V2][File] Update e2e tests
    
    * [Feature][Connector-V2][File] Update docs
    
    * [Improve][Connector-V2][File] Update docs
    
    * [Improve][Connector-V2][File] Update option rule
---
 docs/en/connector-v2/sink/FtpFile.md               | 15 +++++-
 docs/en/connector-v2/sink/HdfsFile.md              | 17 ++++--
 docs/en/connector-v2/sink/LocalFile.md             | 13 +++++
 docs/en/connector-v2/sink/OssFile.md               | 15 +++++-
 docs/en/connector-v2/sink/OssJindoFile.md          | 25 +++++++--
 docs/en/connector-v2/sink/S3File.md                | 63 +++++++++++++---------
 docs/en/connector-v2/sink/SftpFile.md              | 19 ++++++-
 .../seatunnel/file/config/BaseFileSinkConfig.java  | 17 ++----
 .../seatunnel/file/config/BaseSinkConfig.java      | 25 +++++++--
 .../seatunnel/file/config/CompressConfig.java      | 22 --------
 .../seatunnel/file/config/CompressFormat.java      | 40 ++++++++++----
 .../file/sink/writer/AbstractWriteStrategy.java    |  6 +--
 .../file/sink/writer/JsonWriteStrategy.java        | 17 +++++-
 .../file/sink/writer/OrcWriteStrategy.java         |  4 +-
 .../file/sink/writer/ParquetWriteStrategy.java     |  5 +-
 .../file/sink/writer/TextWriteStrategy.java        | 45 +++++++---------
 .../file/ftp/sink/FtpFileSinkFactory.java          |  6 ++-
 .../file/hdfs/sink/HdfsFileSinkFactory.java        |  6 ++-
 .../file/local/sink/LocalFileSinkFactory.java      |  6 ++-
 .../file/oss/sink/OssFileSinkFactory.java          |  6 ++-
 .../file/oss/sink/OssFileSinkFactory.java          |  6 ++-
 .../seatunnel/file/s3/sink/S3FileSinkFactory.java  |  7 ++-
 .../file/sftp/sink/SftpFileSinkFactory.java        |  6 ++-
 .../test/resources/orc/fake_to_local_file_orc.conf |  1 +
 .../parquet/fake_to_local_file_parquet.conf        |  1 +
 .../resources/text/fake_to_local_file_text.conf    |  1 +
 26 files changed, 268 insertions(+), 126 deletions(-)

diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md
index 5d7c00996..5380c6967 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
 
 ### host [string]
@@ -157,6 +158,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -224,4 +235,6 @@ FtpFile {
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index 3790fe01b..dfb115ec4 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | kerberos_principal               | string  | no       | -                                          |
 | kerberos_keytab_path             | string  | no       | -                                          |                                                           |
 | compress_codec                   | string  | no       | none                                       |                                                           |
@@ -153,6 +154,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+- 
 ### kerberos_principal [string]
 
 The principal of kerberos
@@ -161,9 +172,6 @@ The principal of kerberos
 
 The keytab path of kerberos
 
-### compressCodec [string]
-Support lzo compression for text in file format. The file name ends with ".lzo.txt" .
-
 ### common options
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
 
@@ -245,4 +253,5 @@ HdfsFile {
 ### Next version
 - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
 - [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
-- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index ab5753f27..9d9f4d2ac 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -45,6 +45,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
 
 ### path [string]
@@ -137,6 +138,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -206,3 +217,5 @@ LocalFile {
   - When restore writer from states getting transaction directly failed
 
 - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index a45983ea7..5cef80d16 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
 
 ### path [string]
@@ -160,6 +161,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -244,4 +255,6 @@ For orc file format simple config
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 92fb7131d..4c0ccb9a0 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
 
 ### path [string]
@@ -156,6 +157,20 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -166,7 +181,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
 
 ```hocon
 
-  OssFile {
+  OssJindoFile {
     path="/seatunnel/sink"
     bucket = "oss://tyrantlucifer-image-bed"
     access_key = "xxxxxxxxxxx"
@@ -192,7 +207,7 @@ For parquet file format with `sink_columns`
 
 ```hocon
 
-  OssFile {
+  OssJindoFile {
     path = "/seatunnel/sink"
     bucket = "oss://tyrantlucifer-image-bed"
     access_key = "xxxxxxxxxxx"
@@ -221,6 +236,10 @@ For orc file format simple config
 
 ## Changelog
 
+### 2.3.0 2022-12-30
+
+- Add OSS Jindo File Sink Connector
+
 ### Next version
 
-- Add OSS Jindo File Sink Connector
\ No newline at end of file
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index a9040e9bc..76ce51342 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -31,28 +31,29 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-| name                              | type    | required | default value                                         | remarks                                                                                                |
-|-----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
-| path                              | string  | yes      | -                                                     |                                                                                                        |
-| bucket                            | string  | yes      | -                                                     |                                                                                                        |
-| fs.s3a.endpoint                   | string  | yes      | -                                                     |                                                                                                        |
-| fs.s3a.aws.credentials.provider   | string  | yes      | com.amazonaws.auth.InstanceProfileCredentialsProvider |                                                                                                        |
-| access_key                        | string  | no       | -                                                     | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
-| access_secret                     | string  | no       | -                                                     | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
-| custom_filename                   | boolean | no       | false                                                 | Whether you need custom the filename                                                                   |
-| file_name_expression              | string  | no       | "${transactionId}"                                    | Only used when custom_filename is true                                                                 |
-| filename_time_format              | string  | no       | "yyyy.MM.dd"                                          | Only used when custom_filename is true                                                                 |
-| file_format                       | string  | no       | "csv"                                                 |                                                                                                        |
-| field_delimiter                   | string  | no       | '\001'                                                | Only used when file_format is text                                                                     |
-| row_delimiter                     | string  | no       | "\n"                                                  | Only used when file_format is text                                                                     |
-| have_partition                    | boolean | no       | false                                                 | Whether you need processing partitions.                                                                |
-| partition_by                      | array   | no       | -                                                     | Only used then have_partition is true                                                                  |
-| partition_dir_expression          | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"            | Only used then have_partition is true                                                                  |
-| is_partition_field_write_in_file  | boolean | no       | false                                                 | Only used then have_partition is true                                                                  |
-| sink_columns                      | array   | no       |                                                       | When this parameter is empty, all fields are sink columns                                              |
-| is_enable_transaction             | boolean | no       | true                                                  |                                                                                                        |
-| batch_size                        | int     | no       | 1000000                                               |                                                                                                        |
-| common-options                    | object  | no       | -                                                     |                                                                                                        |
+| name                             | type    | required | default value                                         | remarks                                                                                                |
+|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| path                             | string  | yes      | -                                                     |                                                                                                        |
+| bucket                           | string  | yes      | -                                                     |                                                                                                        |
+| fs.s3a.endpoint                  | string  | yes      | -                                                     |                                                                                                        |
+| fs.s3a.aws.credentials.provider  | string  | yes      | com.amazonaws.auth.InstanceProfileCredentialsProvider |                                                                                                        |
+| access_key                       | string  | no       | -                                                     | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
+| access_secret                    | string  | no       | -                                                     | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
+| custom_filename                  | boolean | no       | false                                                 | Whether you need custom the filename                                                                   |
+| file_name_expression             | string  | no       | "${transactionId}"                                    | Only used when custom_filename is true                                                                 |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                          | Only used when custom_filename is true                                                                 |
+| file_format                      | string  | no       | "csv"                                                 |                                                                                                        |
+| field_delimiter                  | string  | no       | '\001'                                                | Only used when file_format is text                                                                     |
+| row_delimiter                    | string  | no       | "\n"                                                  | Only used when file_format is text                                                                     |
+| have_partition                   | boolean | no       | false                                                 | Whether you need processing partitions.                                                                |
+| partition_by                     | array   | no       | -                                                     | Only used then have_partition is true                                                                  |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"            | Only used then have_partition is true                                                                  |
+| is_partition_field_write_in_file | boolean | no       | false                                                 | Only used then have_partition is true                                                                  |
+| sink_columns                     | array   | no       |                                                       | When this parameter is empty, all fields are sink columns                                              |
+| is_enable_transaction            | boolean | no       | true                                                  |                                                                                                        |
+| batch_size                       | int     | no       | 1000000                                               |                                                                                                        |
+| compress_codec                   | string  | no       | none                                                  |                                                                                                        |
+| common-options                   | object  | no       | -                                                     |                                                                                                        |
 
 ### path [string]
 
@@ -174,6 +175,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -254,7 +265,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
 
 - Add S3File Sink Connector
 
-### Next version
+### 2.3.0 2022-12-30
 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
   - When field from upstream is null it will throw NullPointerException
   - Sink columns mapping failed
@@ -264,4 +275,8 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
   - Allow the use of the s3a protocol
   - Decouple hadoop-aws dependencies
 - [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
-- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/))
\ No newline at end of file
+- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/))
+
+### Next version
+
+- ​	[Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index d71ea8b7d..4879cde7b 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | sink_columns                     | array   | no       |                                            | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                       |                                                           |
 | batch_size                       | int     | no       | 1000000                                    |                                                           |
+| compress_codec                   | string  | no       | none                                       |                                                           |
 | common-options                   | object  | no       | -                                          |                                                           |
 
 ### host [string]
@@ -157,6 +158,16 @@ Only support `true` now.
 
 The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
 
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -191,7 +202,7 @@ SftpFile {
 
 ## Changelog
 
-### Next version
+### 2.3.0 2022-12-30
 
 - Add SftpFile Sink Connector
 - [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
@@ -199,4 +210,8 @@ SftpFile {
   - Sink columns mapping failed
   - When restore writer from states getting transaction directly failed
 
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+### Next version
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 57e8079cb..c38f43ae9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -19,11 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
-import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -35,9 +33,9 @@ import java.io.Serializable;
 import java.util.Locale;
 
 @Data
-public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Serializable {
+public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
     private static final long serialVersionUID = 1L;
-    protected String compressCodec;
+    protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
     protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
     protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
     protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
@@ -50,15 +48,8 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri
 
     public BaseFileSinkConfig(@NonNull Config config) {
         if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
-            CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
-            switch (compressFormat) {
-                case LZO:
-                    this.compressCodec = compressFormat.getCompressCodec();
-                    break;
-                default:
-                    throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
-                            "Compress not supported this compress code by SeaTunnel file connector now");
-            }
+            String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
+            this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
         }
         if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
             this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 5abffc13f..572a49fd3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
 
+import java.util.Arrays;
 import java.util.List;
 
 public class BaseSinkConfig {
@@ -37,11 +38,29 @@ public class BaseSinkConfig {
     public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}";
     public static final int DEFAULT_BATCH_SIZE = 1000000;
 
-    public static final Option<String> COMPRESS_CODEC = Options.key("compress_codec")
-            .stringType()
-            .noDefaultValue()
+    public static final Option<CompressFormat> COMPRESS_CODEC = Options.key("compress_codec")
+            .enumType(CompressFormat.class)
+            .defaultValue(CompressFormat.NONE)
             .withDescription("Compression codec");
 
+    public static final Option<CompressFormat> TXT_COMPRESS = Options.key("compress_codec")
+            .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO))
+            .defaultValue(CompressFormat.NONE)
+            .withDescription("Txt file supported compression");
+
+    public static final Option<CompressFormat> PARQUET_COMPRESS = Options.key("compress_codec")
+            .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO,
+                    CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.GZIP,
+                    CompressFormat.BROTLI, CompressFormat.ZSTD))
+            .defaultValue(CompressFormat.NONE)
+            .withDescription("Parquet file supported compression");
+
+    public static final Option<CompressFormat> ORC_COMPRESS = Options.key("compress_codec")
+            .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO,
+                    CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.ZLIB))
+            .defaultValue(CompressFormat.NONE)
+            .withDescription("Orc file supported compression");
+
     public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
             .enumType(DateUtils.Formatter.class)
             .defaultValue(DateUtils.Formatter.YYYY_MM_DD)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java
deleted file mode 100644
index 48d47c8d1..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.config;
-
-public interface CompressConfig {
-    String getCompressCodec();
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
index 6449f1845..6483ae9c8 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
@@ -17,29 +17,49 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import org.apache.orc.CompressionKind;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
 import java.io.Serializable;
 
 public enum CompressFormat implements Serializable {
+    // text json orc parquet support
+    LZO(".lzo", CompressionKind.LZO, CompressionCodecName.LZO),
+
+    // orc and parquet support
+    NONE("", CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED),
+    SNAPPY(".snappy", CompressionKind.SNAPPY, CompressionCodecName.SNAPPY),
+    LZ4(".lz4", CompressionKind.LZ4, CompressionCodecName.LZ4),
+
+    // only orc support
+    ZLIB(".zlib", CompressionKind.ZLIB, CompressionCodecName.UNCOMPRESSED),
 
-    LZO("lzo"),
-    NONE("none");
+    // only parquet support
+    GZIP(".gz", CompressionKind.NONE, CompressionCodecName.GZIP),
+    BROTLI(".br", CompressionKind.NONE, CompressionCodecName.BROTLI),
+    ZSTD(".zstd", CompressionKind.NONE, CompressionCodecName.ZSTD);
 
     private final String compressCodec;
+    private final CompressionKind orcCompression;
+    private final CompressionCodecName parquetCompression;
 
-    CompressFormat(String compressCodec) {
+    CompressFormat(String compressCodec,
+                   CompressionKind orcCompression,
+                   CompressionCodecName parentCompression) {
         this.compressCodec = compressCodec;
+        this.orcCompression = orcCompression;
+        this.parquetCompression = parentCompression;
     }
 
     public String getCompressCodec() {
         return compressCodec;
     }
 
-    public static CompressFormat getCompressFormat(String value) {
-        switch (value) {
-            case "lzo":
-                return CompressFormat.LZO;
-            default:
-                return CompressFormat.NONE;
-        }
+    public CompressionKind getOrcCompression() {
+        return orcCompression;
+    }
+
+    public CompressionCodecName getParquetCompression() {
+        return parquetCompression;
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 41bd8ab08..329af511a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
 public abstract class AbstractWriteStrategy implements WriteStrategy {
     protected final Logger log = LoggerFactory.getLogger(this.getClass());
     protected final FileSinkConfig fileSinkConfig;
+    protected final CompressFormat compressFormat;
     protected final List<Integer> sinkColumnsIndexInRow;
     protected String jobId;
     protected int subTaskIndex;
@@ -90,6 +91,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
         this.fileSinkConfig = fileSinkConfig;
         this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
         this.batchSize = fileSinkConfig.getBatchSize();
+        this.compressFormat = fileSinkConfig.getCompressFormat();
     }
 
     /**
@@ -222,9 +224,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
         String fileNameExpression = fileSinkConfig.getFileNameExpression();
         FileFormat fileFormat = fileSinkConfig.getFileFormat();
         String suffix = fileFormat.getSuffix();
-        if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) {
-            suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix;
-        }
+        suffix = compressFormat.getCompressCodec() + suffix;
         if (StringUtils.isBlank(fileNameExpression)) {
             return transactionId + suffix;
         }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 9ae10fd35..7d9b7ad0d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -25,10 +25,12 @@ import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorExc
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
 
+import io.airlift.compress.lzo.LzopCodec;
 import lombok.NonNull;
 import org.apache.hadoop.fs.FSDataOutputStream;
 
 import java.io.IOException;
+import java.io.OutputStream;
 import java.util.HashMap;
 import java.util.Map;
 
@@ -93,7 +95,20 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
         FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
-                fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                switch (compressFormat) {
+                    case LZO:
+                        LzopCodec lzo = new LzopCodec();
+                        OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+                        fsDataOutputStream = new FSDataOutputStream(out, null);
+                        break;
+                    case NONE:
+                        fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                        break;
+                    default:
+                        log.warn("Json file does not support this compress type: {}", compressFormat.getCompressCodec());
+                        fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                        break;
+                }
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index af5b774e9..507dafdcc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -30,7 +30,6 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
 
 import lombok.NonNull;
 import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionKind;
 import org.apache.orc.OrcFile;
 import org.apache.orc.TypeDescription;
 import org.apache.orc.Writer;
@@ -113,8 +112,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
             try {
                 OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf))
                     .setSchema(schema)
-                    // temporarily used snappy
-                    .compress(CompressionKind.SNAPPY)
+                    .compress(compressFormat.getOrcCompression())
                     // use orc version 0.12
                     .version(OrcFile.Version.V_0_12)
                     .overwrite(true);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 6c431605a..ad05c123e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -42,7 +42,6 @@ import org.apache.parquet.avro.AvroSchemaConverter;
 import org.apache.parquet.column.ParquetProperties;
 import org.apache.parquet.hadoop.ParquetFileWriter;
 import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.parquet.hadoop.util.HadoopOutputFile;
 import org.apache.parquet.schema.ConversionPatterns;
 import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -141,9 +140,7 @@ public class ParquetWriteStrategy extends AbstractWriteStrategy {
                         .withDataModel(dataModel)
                         // use parquet v1 to improve compatibility
                         .withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
-                        // Temporarily use snappy compress
-                        // I think we can use the compress option in config to control this
-                        .withCompressionCodec(CompressionCodecName.SNAPPY)
+                        .withCompressionCodec(compressFormat.getParquetCompression())
                         .withSchema(schema)
                         .build();
                 this.beingWrittenWriter.put(filePath, newWriter);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index b0ba66746..6698e2ee2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
 import java.util.HashMap;
-import java.util.Locale;
 import java.util.Map;
 
 public class TextWriteStrategy extends AbstractWriteStrategy {
@@ -48,18 +46,16 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
     private final DateTimeUtils.Formatter dateTimeFormat;
     private final TimeUtils.Formatter timeFormat;
     private SerializationSchema serializationSchema;
-    private String compressCodec;
 
-    public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
-        super(textFileSinkConfig);
+    public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
+        super(fileSinkConfig);
         this.beingWrittenOutputStream = new HashMap<>();
         this.isFirstWrite = new HashMap<>();
-        this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter();
-        this.rowDelimiter = textFileSinkConfig.getRowDelimiter();
-        this.dateFormat = textFileSinkConfig.getDateFormat();
-        this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
-        this.timeFormat = textFileSinkConfig.getTimeFormat();
-        this.compressCodec = textFileSinkConfig.getCompressCodec();
+        this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
+        this.rowDelimiter = fileSinkConfig.getRowDelimiter();
+        this.dateFormat = fileSinkConfig.getDateFormat();
+        this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
+        this.timeFormat = fileSinkConfig.getTimeFormat();
     }
 
     @Override
@@ -117,21 +113,20 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
         FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
         if (fsDataOutputStream == null) {
             try {
-                if (compressCodec != null) {
-                    CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
-                    switch (compressFormat) {
-                        case LZO:
-                            LzopCodec lzo = new LzopCodec();
-                            OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
-                            fsDataOutputStream = new FSDataOutputStream(out, null);
-                            break;
-                        default:
-                            fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
-                    }
-                } else {
-                    fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                switch (compressFormat) {
+                    case LZO:
+                        LzopCodec lzo = new LzopCodec();
+                        OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+                        fsDataOutputStream = new FSDataOutputStream(out, null);
+                        break;
+                    case NONE:
+                        fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                        break;
+                    default:
+                        log.warn("Text file does not support this compress type: {}", compressFormat.getCompressCodec());
+                        fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+                        break;
                 }
-
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index d5a246b61..35e611ced 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -44,7 +44,11 @@ public class FtpFileSinkFactory implements TableSinkFactory {
             .required(FtpConfig.FTP_PASSWORD)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index d7a27b5cc..68fc6e749 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -41,7 +41,11 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
             .required(BaseSinkConfig.FILE_PATH)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index dfc14884a..bf7390c16 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -39,7 +39,11 @@ public class LocalFileSinkFactory implements TableSinkFactory {
             .required(BaseSinkConfig.FILE_PATH)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 55b27ffa1..8495d95b9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -44,7 +44,11 @@ public class OssFileSinkFactory implements TableSinkFactory {
             .required(OssConfig.ENDPOINT)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 55b27ffa1..8495d95b9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -44,7 +44,11 @@ public class OssFileSinkFactory implements TableSinkFactory {
             .required(OssConfig.ENDPOINT)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 01a3c55af..3a0a95b41 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -44,7 +44,12 @@ public class S3FileSinkFactory implements TableSinkFactory {
                 .conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
                 .optional(S3Config.S3_PROPERTIES)
                 .optional(BaseSinkConfig.FILE_FORMAT)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, BaseSinkConfig.FIELD_DELIMITER)
+                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+                    BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
                 .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT)
                 .optional(BaseSinkConfig.HAVE_PARTITION)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index f2be6d2d0..bc61079fe 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -44,7 +44,11 @@ public class SftpFileSinkFactory implements TableSinkFactory {
             .required(SftpConfig.SFTP_PASSWORD)
             .optional(BaseSinkConfig.FILE_FORMAT)
             .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
-                BaseSinkConfig.FIELD_DELIMITER)
+                BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
index cbbc227b3..a134e9fc0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
@@ -75,5 +75,6 @@ sink {
     file_format = "orc"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
+    compress_codec = "zlib"
   }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
index 454e27f41..c3eae33b0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
@@ -75,5 +75,6 @@ sink {
     file_format = "parquet"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
+    compress_codec = "gzip"
   }
 }
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
index 54227c8ee..795d82234 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
@@ -75,5 +75,6 @@ sink {
     file_format = "text"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
+    compress_codec = "lzo"
   }
 }
\ No newline at end of file