You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2023/03/03 07:57:48 UTC
[incubator-seatunnel] branch cdc-multiple-table updated: Change file type to file_format_type in file source/sink (#4249)
This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/cdc-multiple-table by this push:
new 0a10c40de Change file type to file_format_type in file source/sink (#4249)
0a10c40de is described below
commit 0a10c40de4daa95ffd411cdcff112d7f0026c1d2
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Mar 3 15:57:42 2023 +0800
Change file type to file_format_type in file source/sink (#4249)
---
docs/en/connector-v2/sink/HdfsFile.md | 10 +++++-----
docs/en/connector-v2/sink/OssFile.md | 12 ++++++------
docs/en/connector-v2/sink/OssJindoFile.md | 12 ++++++------
docs/en/connector-v2/sink/S3-Redshift.md | 12 ++++++------
docs/en/connector-v2/sink/S3File.md | 12 ++++++------
docs/en/connector-v2/sink/SftpFile.md | 8 ++++----
docs/en/connector-v2/source/FtpFile.md | 8 ++++----
docs/en/connector-v2/source/HdfsFile.md | 8 ++++----
docs/en/connector-v2/source/LocalFile.md | 10 +++++-----
docs/en/connector-v2/source/OssFile.md | 10 +++++-----
docs/en/connector-v2/source/OssJindoFile.md | 10 +++++-----
docs/en/connector-v2/source/S3File.md | 10 +++++-----
docs/en/connector-v2/source/SftpFile.md | 8 ++++----
.../seatunnel/file/hdfs/source/BaseHdfsFileSource.java | 6 +++---
.../seatunnel/file/config/BaseFileSinkConfig.java | 6 +++---
.../seatunnel/file/config/BaseSinkConfig.java | 4 ++--
.../seatunnel/file/config/BaseSourceConfig.java | 4 ++--
.../seatunnel/file/source/reader/TextReadStrategy.java | 2 +-
.../seatunnel/file/ftp/sink/FtpFileSinkFactory.java | 12 ++++++------
.../seatunnel/file/ftp/source/FtpFileSource.java | 6 +++---
.../file/ftp/source/FtpFileSourceFactory.java | 6 +++---
.../seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java | 12 ++++++------
.../file/hdfs/source/HdfsFileSourceFactory.java | 6 +++---
.../file/local/sink/LocalFileSinkFactory.java | 12 ++++++------
.../seatunnel/file/local/source/LocalFileSource.java | 6 +++---
.../file/local/source/LocalFileSourceFactory.java | 6 +++---
.../seatunnel/file/oss/sink/OssFileSinkFactory.java | 12 ++++++------
.../seatunnel/file/oss/source/OssFileSource.java | 6 +++---
.../file/oss/source/OssFileSourceFactory.java | 6 +++---
.../seatunnel/file/oss/sink/OssFileSinkFactory.java | 12 ++++++------
.../seatunnel/file/oss/source/OssFileSource.java | 6 +++---
.../file/oss/source/OssFileSourceFactory.java | 6 +++---
.../seatunnel/file/s3/catalog/S3Catalog.java | 4 ++--
.../seatunnel/file/s3/catalog/S3DataTypeConvertor.java | 1 -
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 12 ++++++------
.../seatunnel/file/s3/source/S3FileSource.java | 6 +++---
.../seatunnel/file/s3/source/S3FileSourceFactory.java | 6 +++---
.../seatunnel/file/sftp/sink/SftpFileSinkFactory.java | 12 ++++++------
.../seatunnel/file/sftp/source/SftpFileSource.java | 6 +++---
.../file/sftp/source/SftpFileSourceFactory.java | 6 +++---
.../connectors/seatunnel/hive/sink/HiveSink.java | 8 ++++----
.../connectors/seatunnel/hive/source/HiveSource.java | 6 +++---
.../catalog/redshift/RedshiftDataTypeConvertor.java | 18 ++++++++++++++++++
.../seatunnel/redshift/sink/S3RedshiftFactory.java | 2 +-
44 files changed, 185 insertions(+), 168 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index dfb115ec4..0e7589fa4 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -40,7 +40,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -95,7 +95,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -196,7 +196,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
HdfsFile {
fs.defaultFS = "hdfs://hadoopcluster"
path = "/tmp/hive/warehouse/test2"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -226,7 +226,7 @@ HdfsFile {
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
is_enable_transaction = true
}
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index 5cef80d16..3ccdcf41e 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -102,7 +102,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -187,7 +187,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -217,7 +217,7 @@ For parquet file format with `have_partition` and `sink_columns`
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
}
@@ -233,7 +233,7 @@ For orc file format simple config
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 4c0ccb9a0..23f02c7b7 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -102,7 +102,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -187,7 +187,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -213,7 +213,7 @@ For parquet file format with `sink_columns`
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
}
@@ -229,7 +229,7 @@ For orc file format simple config
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/S3-Redshift.md b/docs/en/connector-v2/sink/S3-Redshift.md
index 7a519b770..41eeb514a 100644
--- a/docs/en/connector-v2/sink/S3-Redshift.md
+++ b/docs/en/connector-v2/sink/S3-Redshift.md
@@ -18,7 +18,7 @@ Output data to AWS Redshift.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| access_secret | string | no | - |
| hadoop_s3_properties | map | no | - |
| file_name_expression | string | no | "${transactionId}" |
-| file_format | string | no | "text" |
+| file_format_type | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
@@ -117,7 +117,7 @@ If you need to add a other option, you could add it here and refer to this [Hado
Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -205,7 +205,7 @@ For text file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type = "text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
@@ -233,7 +233,7 @@ For parquet file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="parquet"
+ file_format_type = "parquet"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
@@ -261,7 +261,7 @@ For orc file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="orc"
+ file_format_type = "orc"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index 76ce51342..fbeae0bee 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -22,7 +22,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -116,7 +116,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -201,7 +201,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
- file_format="text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -233,7 +233,7 @@ For parquet file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCr
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
- file_format="parquet"
+ file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
@@ -254,7 +254,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
- file_format="orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index 4879cde7b..6a807e677 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -99,7 +99,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -184,7 +184,7 @@ SftpFile {
username = "username"
password = "password"
path = "/data/sftp"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 6f774e79f..22006ec45 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| user | string | yes | - |
| password | string | yes | - |
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -124,7 +124,7 @@ then Seatunnel will skip the first 2 lines from source files
The schema information of upstream data.
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -216,7 +216,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
port = 21
user = tyrantlucifer
password = tianchao
- type = "text"
+ file_format_type = "text"
schema = {
name = string
age = int
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 240f89146..601e487d0 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format file
- [x] text
- [x] csv
- [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| fs.defaultFS | string | yes | - |
| hdfs_site_path | string | no | - |
| delimiter | string | no | \001 |
@@ -109,7 +109,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -230,7 +230,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
HdfsFile {
path = "/apps/hive/demo/student"
- type = "parquet"
+ file_format_type = "parquet"
fs.defaultFS = "hdfs://namenode001"
}
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 33acf7c90..7aa65360d 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -105,7 +105,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -209,7 +209,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
LocalFile {
path = "/apps/hive/demo/student"
- type = "parquet"
+ file_format_type = "parquet"
}
```
@@ -224,7 +224,7 @@ LocalFile {
}
}
path = "/apps/hive/demo/student"
- type = "json"
+ file_format_type = "json"
}
```
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 038080961..8f619c0e8 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
@@ -112,7 +112,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -236,7 +236,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -249,7 +249,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index b2ecd6d96..ada030e14 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
@@ -112,7 +112,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -236,7 +236,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -249,7 +249,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index 15d0dbd9f..a423b7e8f 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -27,7 +27,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------------|---------|----------|-------------------------------------------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| fs.s3a.endpoint | string | yes | - |
| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider |
@@ -121,7 +121,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -263,7 +263,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index 8108894f4..9501941d7 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| user | string | yes | - |
| password | string | yes | - |
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -124,7 +124,7 @@ then Seatunnel will skip the first 2 lines from source files
The schema information of upstream data.
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -216,7 +216,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
port = 21
user = tyrantlucifer
password = tianchao
- type = "text"
+ file_format_type = "text"
schema = {
name = string
age = int
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 98500d620..e47cfd4cc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -42,13 +42,13 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HdfsSourceConfig.FILE_PATH.key(),
- HdfsSourceConfig.FILE_TYPE.key(), HdfsSourceConfig.DEFAULT_FS.key());
+ HdfsSourceConfig.FILE_FORMAT_TYPE.key(), HdfsSourceConfig.DEFAULT_FS.key());
if (!result.isSuccess()) {
throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
@@ -68,7 +68,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index c38f43ae9..b57e94b03 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -73,9 +73,9 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
}
- if (config.hasPath(BaseSinkConfig.FILE_FORMAT.key()) &&
- !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT.key()))) {
- this.fileFormat = FileFormat.valueOf(config.getString(BaseSinkConfig.FILE_FORMAT.key()).toUpperCase(Locale.ROOT));
+ if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key()) &&
+ !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
+ this.fileFormat = FileFormat.valueOf(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()).toUpperCase(Locale.ROOT));
}
if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 572a49fd3..0e0851744 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -138,10 +138,10 @@ public class BaseSinkConfig {
.defaultValue(DateUtils.Formatter.YYYY_MM_DD_SPOT.getValue())
.withDescription("Only used when `custom_filename` is true. The time format of the path");
- public static final Option<FileFormat> FILE_FORMAT = Options.key("file_format")
+ public static final Option<FileFormat> FILE_FORMAT_TYPE = Options.key("file_format_type")
.enumType(FileFormat.class)
.defaultValue(FileFormat.CSV)
- .withDescription("File format type");
+ .withDescription("File format type, e.g. csv, orc, parquet, text");
public static final Option<List<String>> SINK_COLUMNS = Options.key("sink_columns")
.listType()
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 9ddf90f8a..9d9942123 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -26,10 +26,10 @@ import org.apache.seatunnel.common.utils.TimeUtils;
import java.util.List;
public class BaseSourceConfig {
- public static final Option<FileFormat> FILE_TYPE = Options.key("type")
+ public static final Option<FileFormat> FILE_FORMAT_TYPE = Options.key("file_format_type")
.objectType(FileFormat.class)
.noDefaultValue()
- .withDescription("File type");
+ .withDescription("File format type, e.g. json, csv, text, parquet, orc, avro....");
public static final Option<String> FILE_PATH = Options.key("path")
.stringType()
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index e8df4655d..a401a8e43 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -99,7 +99,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
} else {
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.CSV) {
fieldDelimiter = ",";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 35e611ced..b26c29f62 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -42,13 +42,13 @@ public class FtpFileSinkFactory implements TableSinkFactory {
.required(FtpConfig.FTP_PORT)
.required(FtpConfig.FTP_USERNAME)
.required(FtpConfig.FTP_PASSWORD)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index ddcd2d36d..b19322c08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -51,19 +51,19 @@ public class FtpFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- FtpConfig.FILE_PATH.key(), FtpConfig.FILE_TYPE.key(),
+ FtpConfig.FILE_PATH.key(), FtpConfig.FILE_FORMAT_TYPE.key(),
FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
if (!result.isSuccess()) {
throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg())); }
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"Ftp file source connector only support read [text, csv, json] files");
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(FtpConfig.FILE_PATH.key());
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index c67b55cfa..1e011fae2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -46,9 +46,9 @@ public class FtpFileSourceFactory implements TableSourceFactory {
.required(FtpConfig.FTP_PORT)
.required(FtpConfig.FTP_USERNAME)
.required(FtpConfig.FTP_PASSWORD)
- .required(FtpConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(FtpConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index 68fc6e749..e30611a08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -39,13 +39,13 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(HdfsSourceConfig.DEFAULT_FS)
.required(BaseSinkConfig.FILE_PATH)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 393c5c487..11aaa71d7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -43,9 +43,9 @@ public class HdfsFileSourceFactory implements TableSourceFactory {
return OptionRule.builder()
.required(HdfsSourceConfig.FILE_PATH)
.required(HdfsSourceConfig.DEFAULT_FS)
- .required(BaseSourceConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index bf7390c16..6c84323b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -37,13 +37,13 @@ public class LocalFileSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(BaseSinkConfig.FILE_PATH)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 7ce2777b0..1c20a0811 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -53,13 +53,13 @@ public class LocalFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, LocalSourceConfig.FILE_PATH.key(),
- LocalSourceConfig.FILE_TYPE.key());
+ LocalSourceConfig.FILE_FORMAT_TYPE.key());
if (!result.isSuccess()) {
throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
@@ -70,7 +70,7 @@ public class LocalFileSource extends BaseFileSource {
throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 69f515bb1..3b982e276 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -42,9 +42,9 @@ public class LocalFileSourceFactory implements TableSourceFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(LocalSourceConfig.FILE_PATH)
- .required(BaseSourceConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 8495d95b9..f44376f51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,13 +42,13 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 8c974ac67..dc52bcc7a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -52,7 +52,7 @@ public class OssFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(),
+ OssConfig.FILE_PATH.key(), OssConfig.FILE_FORMAT_TYPE.key(),
OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
if (!result.isSuccess()) {
@@ -60,7 +60,7 @@ public class OssFileSource extends BaseFileSource {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -71,7 +71,7 @@ public class OssFileSource extends BaseFileSource {
throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 721b55f0f..a9b7390f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,9 +46,9 @@ public class OssFileSourceFactory implements TableSourceFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .required(BaseSourceConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 8495d95b9..f44376f51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,13 +42,13 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 83d981996..cc9c80f3c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -51,7 +51,7 @@ public class OssFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(),
+ OssConfig.FILE_PATH.key(), OssConfig.FILE_FORMAT_TYPE.key(),
OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
if (!result.isSuccess()) {
@@ -59,7 +59,7 @@ public class OssFileSource extends BaseFileSource {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -70,7 +70,7 @@ public class OssFileSource extends BaseFileSource {
throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 721b55f0f..a9b7390f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,9 +46,9 @@ public class OssFileSourceFactory implements TableSourceFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .required(BaseSourceConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
index b8cc5aa00..fdeb41b7e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -69,10 +69,10 @@ public class S3Catalog implements Catalog {
@Override
public void open() throws CatalogException {
- ReadStrategy readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ ReadStrategy readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(s3Config);
this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
- readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(s3Config);
try {
fileSystem = FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
index 4c245d161..9c2827f15 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import com.google.auto.service.AutoService;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 3a0a95b41..b9c598b89 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -43,13 +43,13 @@ public class S3FileSinkFactory implements TableSinkFactory {
.required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
.conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT)
.optional(BaseSinkConfig.HAVE_PARTITION)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 12f0afbfc..ce93542e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -51,13 +51,13 @@ public class S3FileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- S3Config.FILE_PATH.key(), S3Config.FILE_TYPE.key(), S3Config.S3_BUCKET.key());
+ S3Config.FILE_PATH.key(), S3Config.FILE_FORMAT_TYPE.key(), S3Config.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(S3Config.FILE_PATH.key());
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
@@ -68,7 +68,7 @@ public class S3FileSource extends BaseFileSource {
throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
}
// support user-defined schema
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index d2f9d429f..78800f0cb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -42,7 +42,7 @@ public class S3FileSourceFactory implements TableSourceFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(S3Config.FILE_PATH)
- .required(S3Config.FILE_TYPE)
+ .required(S3Config.FILE_FORMAT_TYPE)
.required(S3Config.S3_BUCKET)
.required(S3Config.FS_S3A_ENDPOINT)
.required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
@@ -50,8 +50,8 @@ public class S3FileSourceFactory implements TableSourceFactory {
S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY,
S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index bc61079fe..5bf8bcf70 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -42,13 +42,13 @@ public class SftpFileSinkFactory implements TableSinkFactory {
.required(SftpConfig.SFTP_PORT)
.required(SftpConfig.SFTP_USERNAME)
.required(SftpConfig.SFTP_PASSWORD)
- .optional(BaseSinkConfig.FILE_FORMAT)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
- .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
.conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index b4d2ab23e..359f3b384 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -51,7 +51,7 @@ public class SftpFileSource extends BaseFileSource {
@Override
public void prepare(Config pluginConfig) throws PrepareFailException {
CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
- SftpConfig.FILE_PATH.key(), SftpConfig.FILE_TYPE.key(),
+ SftpConfig.FILE_PATH.key(), SftpConfig.FILE_FORMAT_TYPE.key(),
SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
SftpConfig.SFTP_USERNAME.key(), SftpConfig.SFTP_PASSWORD.key());
if (!result.isSuccess()) {
@@ -59,12 +59,12 @@ public class SftpFileSource extends BaseFileSource {
String.format("PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"Sftp file source connector only support read [text, csv, json] files");
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(SftpConfig.FILE_PATH.key());
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index fa434d685..d80b80aae 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -46,9 +46,9 @@ public class SftpFileSourceFactory implements TableSourceFactory {
.required(SftpConfig.SFTP_PORT)
.required(SftpConfig.SFTP_USERNAME)
.required(SftpConfig.SFTP_PASSWORD)
- .required(BaseSourceConfig.FILE_TYPE)
- .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
- .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
.optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 09a6d85d7..f90db5b83 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,7 +18,7 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
@@ -103,13 +103,13 @@ public class HiveSink extends BaseHdfsFileSink {
String outputFormat = tableInformation.getSd().getOutputFormat();
if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
- pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
.withValue(FIELD_DELIMITER.key(), ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
.withValue(ROW_DELIMITER.key(), ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
} else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
- pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
- pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+ pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
} else {
throw new HiveConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
"Hive connector only support [text parquet orc] table now");
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 7c58e3c47..378cee682 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -89,13 +89,13 @@ public class HiveSource extends BaseHdfsFileSource {
tableInformation = tableInfo.getRight();
String inputFormat = tableInformation.getSd().getInputFormat();
if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
} else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
- pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+ pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
} else {
throw new HiveConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index af463ce03..4b9f11cab 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
import static com.google.common.base.Preconditions.checkNotNull;
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
index 696fd03a3..b66421687 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
@@ -40,7 +40,7 @@ public class S3RedshiftFactory implements TableSinkFactory {
return OptionRule.builder()
.required(S3Config.S3_BUCKET, S3RedshiftConfig.JDBC_URL, S3RedshiftConfig.JDBC_USER, S3RedshiftConfig.JDBC_PASSWORD, S3RedshiftConfig.EXECUTE_SQL, BaseSourceConfig.FILE_PATH)
.optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.FILENAME_TIME_FORMAT)
.optional(BaseSinkConfig.FIELD_DELIMITER)
.optional(BaseSinkConfig.ROW_DELIMITER)