You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/01/16 13:57:30 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][File] Support compress (#3899)
This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 55602f6b1 [Feature][Connector-V2][File] Support compress (#3899)
55602f6b1 is described below
commit 55602f6b1c6e7ae477e1952d69bccd468de6bc81
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Mon Jan 16 21:57:21 2023 +0800
[Feature][Connector-V2][File] Support compress (#3899)
* [Feature][Connector-V2][File] Support compress
* [Feature][Connector-V2][File] Update e2e tests
* [Feature][Connector-V2][File] Update docs
* [Improve][Connector-V2][File] Update docs
* [Improve][Connector-V2][File] Update option rule
---
docs/en/connector-v2/sink/FtpFile.md | 15 +++++-
docs/en/connector-v2/sink/HdfsFile.md | 17 ++++--
docs/en/connector-v2/sink/LocalFile.md | 13 +++++
docs/en/connector-v2/sink/OssFile.md | 15 +++++-
docs/en/connector-v2/sink/OssJindoFile.md | 25 +++++++--
docs/en/connector-v2/sink/S3File.md | 63 +++++++++++++---------
docs/en/connector-v2/sink/SftpFile.md | 19 ++++++-
.../seatunnel/file/config/BaseFileSinkConfig.java | 17 ++----
.../seatunnel/file/config/BaseSinkConfig.java | 25 +++++++--
.../seatunnel/file/config/CompressConfig.java | 22 --------
.../seatunnel/file/config/CompressFormat.java | 40 ++++++++++----
.../file/sink/writer/AbstractWriteStrategy.java | 6 +--
.../file/sink/writer/JsonWriteStrategy.java | 17 +++++-
.../file/sink/writer/OrcWriteStrategy.java | 4 +-
.../file/sink/writer/ParquetWriteStrategy.java | 5 +-
.../file/sink/writer/TextWriteStrategy.java | 45 +++++++---------
.../file/ftp/sink/FtpFileSinkFactory.java | 6 ++-
.../file/hdfs/sink/HdfsFileSinkFactory.java | 6 ++-
.../file/local/sink/LocalFileSinkFactory.java | 6 ++-
.../file/oss/sink/OssFileSinkFactory.java | 6 ++-
.../file/oss/sink/OssFileSinkFactory.java | 6 ++-
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 7 ++-
.../file/sftp/sink/SftpFileSinkFactory.java | 6 ++-
.../test/resources/orc/fake_to_local_file_orc.conf | 1 +
.../parquet/fake_to_local_file_parquet.conf | 1 +
.../resources/text/fake_to_local_file_text.conf | 1 +
26 files changed, 268 insertions(+), 126 deletions(-)
diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md
index 5d7c00996..5380c6967 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
### host [string]
@@ -157,6 +158,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -224,4 +235,6 @@ FtpFile {
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index 3790fe01b..dfb115ec4 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -50,6 +50,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| kerberos_principal | string | no | - |
| kerberos_keytab_path | string | no | - | |
| compress_codec | string | no | none | |
@@ -153,6 +154,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+-
### kerberos_principal [string]
The principal of kerberos
@@ -161,9 +172,6 @@ The principal of kerberos
The keytab path of kerberos
-### compressCodec [string]
-Support lzo compression for text in file format. The file name ends with ".lzo.txt" .
-
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
@@ -245,4 +253,5 @@ HdfsFile {
### Next version
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
- [Improve] Support lzo compression for text in file format ([3782](https://github.com/apache/incubator-seatunnel/pull/3782))
-- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
\ No newline at end of file
+- [Improve] Support kerberos authentication ([3840](https://github.com/apache/incubator-seatunnel/pull/3840))
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index ab5753f27..9d9f4d2ac 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -45,6 +45,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
### path [string]
@@ -137,6 +138,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -206,3 +217,5 @@ LocalFile {
- When restore writer from states getting transaction directly failed
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index a45983ea7..5cef80d16 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
### path [string]
@@ -160,6 +161,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -244,4 +255,6 @@ For orc file format simple config
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 92fb7131d..4c0ccb9a0 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -52,6 +52,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
### path [string]
@@ -156,6 +157,20 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
Only support `true` now.
+### batch_size [int]
+
+The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -166,7 +181,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
```hocon
- OssFile {
+ OssJindoFile {
path="/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
@@ -192,7 +207,7 @@ For parquet file format with `sink_columns`
```hocon
- OssFile {
+ OssJindoFile {
path = "/seatunnel/sink"
bucket = "oss://tyrantlucifer-image-bed"
access_key = "xxxxxxxxxxx"
@@ -221,6 +236,10 @@ For orc file format simple config
## Changelog
+### 2.3.0 2022-12-30
+
+- Add OSS Jindo File Sink Connector
+
### Next version
-- Add OSS Jindo File Sink Connector
\ No newline at end of file
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index a9040e9bc..76ce51342 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -31,28 +31,29 @@ By default, we use 2PC commit to ensure `exactly-once`
## Options
-| name | type | required | default value | remarks |
-|-----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
-| path | string | yes | - | |
-| bucket | string | yes | - | |
-| fs.s3a.endpoint | string | yes | - | |
-| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | |
-| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
-| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
-| custom_filename | boolean | no | false | Whether you need custom the filename |
-| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
-| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
-| field_delimiter | string | no | '\001' | Only used when file_format is text |
-| row_delimiter | string | no | "\n" | Only used when file_format is text |
-| have_partition | boolean | no | false | Whether you need processing partitions. |
-| partition_by | array | no | - | Only used then have_partition is true |
-| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
-| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
-| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction | boolean | no | true | |
-| batch_size | int | no | 1000000 | |
-| common-options | object | no | - | |
+| name | type | required | default value | remarks |
+|----------------------------------|---------|----------|-------------------------------------------------------|--------------------------------------------------------------------------------------------------------|
+| path | string | yes | - | |
+| bucket | string | yes | - | |
+| fs.s3a.endpoint | string | yes | - | |
+| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider | |
+| access_key | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
+| access_secret | string | no | - | Only used when fs.s3a.aws.credentials.provider = org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider |
+| custom_filename | boolean | no | false | Whether you need custom the filename |
+| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
+| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
+| file_format | string | no | "csv" | |
+| field_delimiter | string | no | '\001' | Only used when file_format is text |
+| row_delimiter | string | no | "\n" | Only used when file_format is text |
+| have_partition | boolean | no | false | Whether you need processing partitions. |
+| partition_by | array | no | - | Only used then have_partition is true |
+| partition_dir_expression | string | no | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" | Only used then have_partition is true |
+| is_partition_field_write_in_file | boolean | no | false | Only used then have_partition is true |
+| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction | boolean | no | true | |
+| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
+| common-options | object | no | - | |
### path [string]
@@ -174,6 +175,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -254,7 +265,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
- Add S3File Sink Connector
-### Next version
+### 2.3.0 2022-12-30
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
- When field from upstream is null it will throw NullPointerException
- Sink columns mapping failed
@@ -264,4 +275,8 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
- Allow the use of the s3a protocol
- Decouple hadoop-aws dependencies
- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
-- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/))
\ No newline at end of file
+- [Feature]Set S3 AK to optional ([3688](https://github.com/apache/incubator-seatunnel/pull/))
+
+### Next version
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index d71ea8b7d..4879cde7b 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -49,6 +49,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| sink_columns | array | no | | When this parameter is empty, all fields are sink columns |
| is_enable_transaction | boolean | no | true | |
| batch_size | int | no | 1000000 | |
+| compress_codec | string | no | none | |
| common-options | object | no | - | |
### host [string]
@@ -157,6 +158,16 @@ Only support `true` now.
The maximum number of rows in a file. For SeaTunnel Engine, the number of lines in the file is determined by `batch_size` and `checkpoint.interval` jointly decide. If the value of `checkpoint.interval` is large enough, sink writer will write rows in a file until the rows in the file larger than `batch_size`. If `checkpoint.interval` is small, the sink writer will create a new file when a new checkpoint trigger.
+### compress_codec [string]
+
+The compress codec of files and the details that supported as the following shown:
+
+- txt: `lzo` `none`
+- json: `lzo` `none`
+- csv: `lzo` `none`
+- orc: `lzo` `snappy` `lz4` `zlib` `none`
+- parquet: `lzo` `snappy` `lz4` `gzip` `brotli` `zstd` `none`
+
### common options
Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -191,7 +202,7 @@ SftpFile {
## Changelog
-### Next version
+### 2.3.0 2022-12-30
- Add SftpFile Sink Connector
- [BugFix] Fixed the following bugs that failed to write data to files ([3258](https://github.com/apache/incubator-seatunnel/pull/3258))
@@ -199,4 +210,8 @@ SftpFile {
- Sink columns mapping failed
- When restore writer from states getting transaction directly failed
-- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
\ No newline at end of file
+- [Improve] Support setting batch size for every file ([3625](https://github.com/apache/incubator-seatunnel/pull/3625))
+
+### Next version
+
+- [Improve] Support file compress ([3899](https://github.com/apache/incubator-seatunnel/pull/3899))
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index 57e8079cb..c38f43ae9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -19,11 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
import static com.google.common.base.Preconditions.checkNotNull;
-import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,9 +33,9 @@ import java.io.Serializable;
import java.util.Locale;
@Data
-public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Serializable {
+public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
private static final long serialVersionUID = 1L;
- protected String compressCodec;
+ protected CompressFormat compressFormat = BaseSinkConfig.COMPRESS_CODEC.defaultValue();
protected String fieldDelimiter = BaseSinkConfig.FIELD_DELIMITER.defaultValue();
protected String rowDelimiter = BaseSinkConfig.ROW_DELIMITER.defaultValue();
protected int batchSize = BaseSinkConfig.BATCH_SIZE.defaultValue();
@@ -50,15 +48,8 @@ public class BaseFileSinkConfig implements DelimiterConfig, CompressConfig, Seri
public BaseFileSinkConfig(@NonNull Config config) {
if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
- CompressFormat compressFormat = CompressFormat.valueOf(config.getString(BaseSinkConfig.COMPRESS_CODEC.key()).toUpperCase(Locale.ROOT));
- switch (compressFormat) {
- case LZO:
- this.compressCodec = compressFormat.getCompressCodec();
- break;
- default:
- throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
- "Compress not supported this compress code by SeaTunnel file connector now");
- }
+ String compressCodec = config.getString(BaseSinkConfig.COMPRESS_CODEC.key());
+ this.compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase());
}
if (config.hasPath(BaseSinkConfig.BATCH_SIZE.key())) {
this.batchSize = config.getInt(BaseSinkConfig.BATCH_SIZE.key());
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 5abffc13f..572a49fd3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
+import java.util.Arrays;
import java.util.List;
public class BaseSinkConfig {
@@ -37,11 +38,29 @@ public class BaseSinkConfig {
public static final String DEFAULT_FILE_NAME_EXPRESSION = "${transactionId}";
public static final int DEFAULT_BATCH_SIZE = 1000000;
- public static final Option<String> COMPRESS_CODEC = Options.key("compress_codec")
- .stringType()
- .noDefaultValue()
+ public static final Option<CompressFormat> COMPRESS_CODEC = Options.key("compress_codec")
+ .enumType(CompressFormat.class)
+ .defaultValue(CompressFormat.NONE)
.withDescription("Compression codec");
+ public static final Option<CompressFormat> TXT_COMPRESS = Options.key("compress_codec")
+ .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO))
+ .defaultValue(CompressFormat.NONE)
+ .withDescription("Txt file supported compression");
+
+ public static final Option<CompressFormat> PARQUET_COMPRESS = Options.key("compress_codec")
+ .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO,
+ CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.GZIP,
+ CompressFormat.BROTLI, CompressFormat.ZSTD))
+ .defaultValue(CompressFormat.NONE)
+ .withDescription("Parquet file supported compression");
+
+ public static final Option<CompressFormat> ORC_COMPRESS = Options.key("compress_codec")
+ .singleChoice(CompressFormat.class, Arrays.asList(CompressFormat.NONE, CompressFormat.LZO,
+ CompressFormat.SNAPPY, CompressFormat.LZ4, CompressFormat.ZLIB))
+ .defaultValue(CompressFormat.NONE)
+ .withDescription("Orc file supported compression");
+
public static final Option<DateUtils.Formatter> DATE_FORMAT = Options.key("date_format")
.enumType(DateUtils.Formatter.class)
.defaultValue(DateUtils.Formatter.YYYY_MM_DD)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java
deleted file mode 100644
index 48d47c8d1..000000000
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressConfig.java
+++ /dev/null
@@ -1,22 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.config;
-
-public interface CompressConfig {
- String getCompressCodec();
-}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
index 6449f1845..6483ae9c8 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/CompressFormat.java
@@ -17,29 +17,49 @@
package org.apache.seatunnel.connectors.seatunnel.file.config;
+import org.apache.orc.CompressionKind;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+
import java.io.Serializable;
public enum CompressFormat implements Serializable {
+ // text json orc parquet support
+ LZO(".lzo", CompressionKind.LZO, CompressionCodecName.LZO),
+
+ // orc and parquet support
+ NONE("", CompressionKind.NONE, CompressionCodecName.UNCOMPRESSED),
+ SNAPPY(".snappy", CompressionKind.SNAPPY, CompressionCodecName.SNAPPY),
+ LZ4(".lz4", CompressionKind.LZ4, CompressionCodecName.LZ4),
+
+ // only orc support
+ ZLIB(".zlib", CompressionKind.ZLIB, CompressionCodecName.UNCOMPRESSED),
- LZO("lzo"),
- NONE("none");
+ // only parquet support
+ GZIP(".gz", CompressionKind.NONE, CompressionCodecName.GZIP),
+ BROTLI(".br", CompressionKind.NONE, CompressionCodecName.BROTLI),
+ ZSTD(".zstd", CompressionKind.NONE, CompressionCodecName.ZSTD);
private final String compressCodec;
+ private final CompressionKind orcCompression;
+ private final CompressionCodecName parquetCompression;
- CompressFormat(String compressCodec) {
+ CompressFormat(String compressCodec,
+ CompressionKind orcCompression,
+ CompressionCodecName parentCompression) {
this.compressCodec = compressCodec;
+ this.orcCompression = orcCompression;
+ this.parquetCompression = parentCompression;
}
public String getCompressCodec() {
return compressCodec;
}
- public static CompressFormat getCompressFormat(String value) {
- switch (value) {
- case "lzo":
- return CompressFormat.LZO;
- default:
- return CompressFormat.NONE;
- }
+ public CompressionKind getOrcCompression() {
+ return orcCompression;
+ }
+
+ public CompressionCodecName getParquetCompression() {
+ return parquetCompression;
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 41bd8ab08..329af511a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -63,6 +63,7 @@ import java.util.stream.Collectors;
public abstract class AbstractWriteStrategy implements WriteStrategy {
protected final Logger log = LoggerFactory.getLogger(this.getClass());
protected final FileSinkConfig fileSinkConfig;
+ protected final CompressFormat compressFormat;
protected final List<Integer> sinkColumnsIndexInRow;
protected String jobId;
protected int subTaskIndex;
@@ -90,6 +91,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
this.fileSinkConfig = fileSinkConfig;
this.sinkColumnsIndexInRow = fileSinkConfig.getSinkColumnsIndexInRow();
this.batchSize = fileSinkConfig.getBatchSize();
+ this.compressFormat = fileSinkConfig.getCompressFormat();
}
/**
@@ -222,9 +224,7 @@ public abstract class AbstractWriteStrategy implements WriteStrategy {
String fileNameExpression = fileSinkConfig.getFileNameExpression();
FileFormat fileFormat = fileSinkConfig.getFileFormat();
String suffix = fileFormat.getSuffix();
- if (CompressFormat.LZO.getCompressCodec().equals(fileSinkConfig.getCompressCodec())) {
- suffix = "." + CompressFormat.LZO.getCompressCodec() + "." + suffix;
- }
+ suffix = compressFormat.getCompressCodec() + suffix;
if (StringUtils.isBlank(fileNameExpression)) {
return transactionId + suffix;
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index 9ae10fd35..7d9b7ad0d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -25,10 +25,12 @@ import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorExc
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.json.JsonSerializationSchema;
+import io.airlift.compress.lzo.LzopCodec;
import lombok.NonNull;
import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.util.HashMap;
import java.util.Map;
@@ -93,7 +95,20 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
- fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ switch (compressFormat) {
+ case LZO:
+ LzopCodec lzo = new LzopCodec();
+ OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+ fsDataOutputStream = new FSDataOutputStream(out, null);
+ break;
+ case NONE:
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ break;
+ default:
+ log.warn("Json file does not support this compress type: {}", compressFormat.getCompressCodec());
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ break;
+ }
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index af5b774e9..507dafdcc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -30,7 +30,6 @@ import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig
import lombok.NonNull;
import org.apache.hadoop.fs.Path;
-import org.apache.orc.CompressionKind;
import org.apache.orc.OrcFile;
import org.apache.orc.TypeDescription;
import org.apache.orc.Writer;
@@ -113,8 +112,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
try {
OrcFile.WriterOptions options = OrcFile.writerOptions(getConfiguration(hadoopConf))
.setSchema(schema)
- // temporarily used snappy
- .compress(CompressionKind.SNAPPY)
+ .compress(compressFormat.getOrcCompression())
// use orc version 0.12
.version(OrcFile.Version.V_0_12)
.overwrite(true);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index 6c431605a..ad05c123e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -42,7 +42,6 @@ import org.apache.parquet.avro.AvroSchemaConverter;
import org.apache.parquet.column.ParquetProperties;
import org.apache.parquet.hadoop.ParquetFileWriter;
import org.apache.parquet.hadoop.ParquetWriter;
-import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.util.HadoopOutputFile;
import org.apache.parquet.schema.ConversionPatterns;
import org.apache.parquet.schema.LogicalTypeAnnotation;
@@ -141,9 +140,7 @@ public class ParquetWriteStrategy extends AbstractWriteStrategy {
.withDataModel(dataModel)
// use parquet v1 to improve compatibility
.withWriterVersion(ParquetProperties.WriterVersion.PARQUET_1_0)
- // Temporarily use snappy compress
- // I think we can use the compress option in config to control this
- .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .withCompressionCodec(compressFormat.getParquetCompression())
.withSchema(schema)
.build();
this.beingWrittenWriter.put(filePath, newWriter);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index b0ba66746..6698e2ee2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -24,7 +24,6 @@ import org.apache.seatunnel.common.exception.CommonErrorCode;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
-import org.apache.seatunnel.connectors.seatunnel.file.config.CompressFormat;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSinkConfig;
import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -36,7 +35,6 @@ import org.apache.hadoop.fs.FSDataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.HashMap;
-import java.util.Locale;
import java.util.Map;
public class TextWriteStrategy extends AbstractWriteStrategy {
@@ -48,18 +46,16 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
private final DateTimeUtils.Formatter dateTimeFormat;
private final TimeUtils.Formatter timeFormat;
private SerializationSchema serializationSchema;
- private String compressCodec;
- public TextWriteStrategy(FileSinkConfig textFileSinkConfig) {
- super(textFileSinkConfig);
+ public TextWriteStrategy(FileSinkConfig fileSinkConfig) {
+ super(fileSinkConfig);
this.beingWrittenOutputStream = new HashMap<>();
this.isFirstWrite = new HashMap<>();
- this.fieldDelimiter = textFileSinkConfig.getFieldDelimiter();
- this.rowDelimiter = textFileSinkConfig.getRowDelimiter();
- this.dateFormat = textFileSinkConfig.getDateFormat();
- this.dateTimeFormat = textFileSinkConfig.getDatetimeFormat();
- this.timeFormat = textFileSinkConfig.getTimeFormat();
- this.compressCodec = textFileSinkConfig.getCompressCodec();
+ this.fieldDelimiter = fileSinkConfig.getFieldDelimiter();
+ this.rowDelimiter = fileSinkConfig.getRowDelimiter();
+ this.dateFormat = fileSinkConfig.getDateFormat();
+ this.dateTimeFormat = fileSinkConfig.getDatetimeFormat();
+ this.timeFormat = fileSinkConfig.getTimeFormat();
}
@Override
@@ -117,21 +113,20 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
FSDataOutputStream fsDataOutputStream = beingWrittenOutputStream.get(filePath);
if (fsDataOutputStream == null) {
try {
- if (compressCodec != null) {
- CompressFormat compressFormat = CompressFormat.valueOf(compressCodec.toUpperCase(Locale.ROOT));
- switch (compressFormat) {
- case LZO:
- LzopCodec lzo = new LzopCodec();
- OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
- fsDataOutputStream = new FSDataOutputStream(out, null);
- break;
- default:
- fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
- }
- } else {
- fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ switch (compressFormat) {
+ case LZO:
+ LzopCodec lzo = new LzopCodec();
+ OutputStream out = lzo.createOutputStream(fileSystemUtils.getOutputStream(filePath));
+ fsDataOutputStream = new FSDataOutputStream(out, null);
+ break;
+ case NONE:
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ break;
+ default:
+ log.warn("Text file does not support this compress type: {}", compressFormat.getCompressCodec());
+ fsDataOutputStream = fileSystemUtils.getOutputStream(filePath);
+ break;
}
-
beingWrittenOutputStream.put(filePath, fsDataOutputStream);
isFirstWrite.put(filePath, true);
} catch (IOException e) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index d5a246b61..35e611ced 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -44,7 +44,11 @@ public class FtpFileSinkFactory implements TableSinkFactory {
.required(FtpConfig.FTP_PASSWORD)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index d7a27b5cc..68fc6e749 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -41,7 +41,11 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
.required(BaseSinkConfig.FILE_PATH)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index dfc14884a..bf7390c16 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -39,7 +39,11 @@ public class LocalFileSinkFactory implements TableSinkFactory {
.required(BaseSinkConfig.FILE_PATH)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 55b27ffa1..8495d95b9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -44,7 +44,11 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ENDPOINT)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 55b27ffa1..8495d95b9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -44,7 +44,11 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ENDPOINT)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 01a3c55af..3a0a95b41 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -44,7 +44,12 @@ public class S3FileSinkFactory implements TableSinkFactory {
.conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
.optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER, BaseSinkConfig.FIELD_DELIMITER)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT)
.optional(BaseSinkConfig.HAVE_PARTITION)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index f2be6d2d0..bc61079fe 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -44,7 +44,11 @@ public class SftpFileSinkFactory implements TableSinkFactory {
.required(SftpConfig.SFTP_PASSWORD)
.optional(BaseSinkConfig.FILE_FORMAT)
.conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
- BaseSinkConfig.FIELD_DELIMITER)
+ BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
index cbbc227b3..a134e9fc0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
@@ -75,5 +75,6 @@ sink {
file_format = "orc"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
+ compress_codec = "zlib"
}
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
index 454e27f41..c3eae33b0 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
@@ -75,5 +75,6 @@ sink {
file_format = "parquet"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
+ compress_codec = "gzip"
}
}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
index 54227c8ee..795d82234 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
@@ -75,5 +75,6 @@ sink {
file_format = "text"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
+ compress_codec = "lzo"
}
}
\ No newline at end of file