You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2023/03/04 03:27:46 UTC
[incubator-seatunnel] 04/04: Change file type to file_format_type in file source/sink (#4249)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
commit 0b8fb1a4aa6efef21c5e54ece5c78fb062abeb00
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Mar 3 15:57:42 2023 +0800
Change file type to file_format_type in file source/sink (#4249)
# Conflicts:
# docs/en/connector-v2/sink/OssFile.md
# docs/en/connector-v2/sink/OssJindoFile.md
# docs/en/connector-v2/sink/S3-Redshift.md
# docs/en/connector-v2/sink/SftpFile.md
# docs/en/connector-v2/source/FtpFile.md
# docs/en/connector-v2/source/LocalFile.md
# docs/en/connector-v2/source/SftpFile.md
# seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
# seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
# seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
# seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
# seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
# seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
# seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
# seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
# seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
# seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
# seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
---
docs/en/connector-v2/sink/HdfsFile.md | 10 +++++-----
docs/en/connector-v2/sink/OssFile.md | 12 ++++++------
docs/en/connector-v2/sink/OssJindoFile.md | 12 ++++++------
docs/en/connector-v2/sink/S3-Redshift.md | 12 ++++++------
docs/en/connector-v2/sink/S3File.md | 12 ++++++------
docs/en/connector-v2/sink/SftpFile.md | 8 ++++----
docs/en/connector-v2/source/FtpFile.md | 8 ++++----
docs/en/connector-v2/source/HdfsFile.md | 8 ++++----
docs/en/connector-v2/source/LocalFile.md | 10 +++++-----
docs/en/connector-v2/source/OssFile.md | 10 +++++-----
docs/en/connector-v2/source/OssJindoFile.md | 10 +++++-----
docs/en/connector-v2/source/S3File.md | 10 +++++-----
docs/en/connector-v2/source/SftpFile.md | 8 ++++----
.../file/hdfs/source/BaseHdfsFileSource.java | 9 ++++++---
.../seatunnel/file/config/BaseFileSinkConfig.java | 6 +++---
.../seatunnel/file/config/BaseSinkConfig.java | 6 +++---
.../seatunnel/file/config/BaseSourceConfig.java | 7 ++++---
.../file/source/reader/TextReadStrategy.java | 4 +++-
.../seatunnel/file/ftp/sink/FtpFileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/ftp/source/FtpFileSource.java | 8 +++++---
.../file/ftp/source/FtpFileSourceFactory.java | 8 +++++---
.../file/hdfs/sink/HdfsFileSinkFactory.java | 18 ++++++++++++------
.../file/hdfs/source/HdfsFileSourceFactory.java | 8 +++++---
.../file/local/sink/LocalFileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/local/source/LocalFileSource.java | 9 ++++++---
.../file/local/source/LocalFileSourceFactory.java | 8 +++++---
.../seatunnel/file/oss/sink/OssFileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/oss/source/OssFileSource.java | 8 +++++---
.../file/oss/source/OssFileSourceFactory.java | 8 +++++---
.../seatunnel/file/oss/sink/OssFileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/oss/source/OssFileSource.java | 8 +++++---
.../file/oss/source/OssFileSourceFactory.java | 8 +++++---
.../seatunnel/file/s3/catalog/S3Catalog.java | 4 ++--
.../seatunnel/file/s3/sink/S3FileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/s3/source/S3FileSource.java | 8 +++++---
.../file/s3/source/S3FileSourceFactory.java | 8 +++++---
.../file/sftp/sink/SftpFileSinkFactory.java | 18 ++++++++++++------
.../seatunnel/file/sftp/source/SftpFileSource.java | 7 ++++---
.../file/sftp/source/SftpFileSourceFactory.java | 8 +++++---
.../connectors/seatunnel/hive/sink/HiveSink.java | 10 ++++------
.../connectors/seatunnel/hive/source/HiveSource.java | 6 +++---
.../catalog/redshift/RedshiftDataTypeConvertor.java | 20 +++++++++++++++++++-
.../seatunnel/redshift/sink/S3RedshiftFactory.java | 2 +-
.../test/resources/json/fake_to_local_file_json.conf | 2 +-
.../resources/json/local_file_json_to_assert.conf | 2 +-
.../test/resources/orc/fake_to_local_file_orc.conf | 2 +-
.../test/resources/orc/local_file_orc_to_assert.conf | 4 ++--
.../parquet/fake_to_local_file_parquet.conf | 2 +-
.../parquet/local_file_parquet_to_assert.conf | 2 +-
.../test/resources/text/fake_to_local_file_text.conf | 2 +-
.../resources/text/local_file_text_skip_headers.conf | 2 +-
.../resources/text/local_file_text_to_assert.conf | 2 +-
.../src/test/resources/batch_fakesource_to_file.conf | 2 +-
.../resources/batch_fakesource_to_file_complex.conf | 2 +-
.../cluster_batch_fake_to_localfile_template.conf | 2 +-
...atch_fake_to_localfile_two_pipeline_template.conf | 2 +-
.../streaming_fakesource_to_file_complex.conf | 2 +-
57 files changed, 277 insertions(+), 187 deletions(-)
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index f49b34348..b627c4cf0 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -95,7 +95,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -198,7 +198,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
HdfsFile {
fs.defaultFS = "hdfs://hadoopcluster"
path = "/tmp/hive/warehouse/test2"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -228,7 +228,7 @@ HdfsFile {
custom_filename = true
file_name_expression = "${transactionId}_${now}"
filename_time_format = "yyyy.MM.dd"
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
is_enable_transaction = true
}
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index 7c4b2b4e0..c9592e35c 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -103,7 +103,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -188,7 +188,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -218,7 +218,7 @@ For parquet file format with `have_partition` and `sink_columns`
partition_by = ["age"]
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
}
@@ -234,7 +234,7 @@ For orc file format simple config
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 5447a7098..a43e00562 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -103,7 +103,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -188,7 +188,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -214,7 +214,7 @@ For parquet file format with `sink_columns`
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "parquet"
+ file_format_type = "parquet"
sink_columns = ["name","age"]
}
@@ -230,7 +230,7 @@ For orc file format simple config
access_key = "xxxxxxxxxxx"
access_secret = "xxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- file_format = "orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/S3-Redshift.md b/docs/en/connector-v2/sink/S3-Redshift.md
index 331ca1599..978ffc7c9 100644
--- a/docs/en/connector-v2/sink/S3-Redshift.md
+++ b/docs/en/connector-v2/sink/S3-Redshift.md
@@ -17,7 +17,7 @@ Output data to AWS Redshift.
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -38,7 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| access_secret | string | no | - |
| hadoop_s3_properties | map | no | - |
| file_name_expression | string | no | "${transactionId}" |
-| file_format | string | no | "text" |
+| file_format_type | string | no | "text" |
| filename_time_format | string | no | "yyyy.MM.dd" |
| field_delimiter | string | no | '\001' |
| row_delimiter | string | no | "\n" |
@@ -118,7 +118,7 @@ hadoop_s3_properties {
Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -206,7 +206,7 @@ For text file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type = "text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
@@ -234,7 +234,7 @@ For parquet file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="parquet"
+ file_format_type = "parquet"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
@@ -262,7 +262,7 @@ For orc file format
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="orc"
+ file_format_type = "orc"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
hadoop_s3_properties {
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index 4229564c0..c544ae63b 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -22,7 +22,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -120,7 +120,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -205,7 +205,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
path="/seatunnel/text"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
- file_format="text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
@@ -237,7 +237,7 @@ For parquet file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCr
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
- file_format="parquet"
+ file_format_type = "parquet"
hadoop_s3_properties {
"fs.s3a.buffer.dir" = "/data/st_test/s3a"
"fs.s3a.fast.upload.buffer" = "disk"
@@ -258,7 +258,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
- file_format="orc"
+ file_format_type = "orc"
}
```
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index 0ee306c80..ac17d50e4 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
By default, we use 2PC commit to ensure `exactly-once`
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
| custom_filename | boolean | no | false | Whether you need custom the filename |
| file_name_expression | string | no | "${transactionId}" | Only used when custom_filename is true |
| filename_time_format | string | no | "yyyy.MM.dd" | Only used when custom_filename is true |
-| file_format | string | no | "csv" | |
+| file_format_type | string | no | "csv" | |
| field_delimiter | string | no | '\001' | Only used when file_format is text |
| row_delimiter | string | no | "\n" | Only used when file_format is text |
| have_partition | boolean | no | false | Whether you need processing partitions. |
@@ -100,7 +100,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
| m | Minute in hour |
| s | Second in minute |
-### file_format [string]
+### file_format_type [string]
We supported as the following file types:
@@ -185,7 +185,7 @@ SftpFile {
username = "username"
password = "password"
path = "/data/sftp"
- file_format = "text"
+ file_format_type = "text"
field_delimiter = "\t"
row_delimiter = "\n"
have_partition = true
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 124fac7a0..bc5c0519e 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| user | string | yes | - |
| password | string | yes | - |
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
@@ -139,7 +139,7 @@ The file type supported column projection as the following shown:
**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured**
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -230,7 +230,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
port = 21
user = tyrantlucifer
password = tianchao
- type = "text"
+ file_format_type = "text"
schema = {
name = string
age = int
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index cde7be449..9bc27bfff 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format file
- [x] text
- [x] csv
- [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| fs.defaultFS | string | yes | - |
| read_columns | list | yes | - |
| hdfs_site_path | string | no | - |
@@ -110,7 +110,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -244,7 +244,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
HdfsFile {
path = "/apps/hive/demo/student"
- type = "parquet"
+ file_format_type = "parquet"
fs.defaultFS = "hdfs://namenode001"
}
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 6a74d18ab..69f517075 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| read_columns | list | no | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
@@ -106,7 +106,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -224,7 +224,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
LocalFile {
path = "/apps/hive/demo/student"
- type = "parquet"
+ file_format_type = "parquet"
}
```
@@ -239,7 +239,7 @@ LocalFile {
}
}
path = "/apps/hive/demo/student"
- type = "json"
+ file_format_type = "json"
}
```
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 0ca6eeb50..7479ee038 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
@@ -113,7 +113,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -264,7 +264,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index fef93da5b..f7efeb7fe 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [ ] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------|---------|----------|---------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| access_key | string | yes | - |
| access_secret | string | yes | - |
@@ -113,7 +113,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -264,7 +264,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
endpoint = "oss-cn-beijing.aliyuncs.com"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index 65611e770..dee60e979 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -27,7 +27,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] parquet
@@ -39,7 +39,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
| name | type | required | default value |
|---------------------------------|---------|----------|-------------------------------------------------------|
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| bucket | string | yes | - |
| fs.s3a.endpoint | string | yes | - |
| fs.s3a.aws.credentials.provider | string | yes | com.amazonaws.auth.InstanceProfileCredentialsProvider |
@@ -124,7 +124,7 @@ For example, set like following:
then Seatunnel will skip the first 2 lines from source files
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -269,7 +269,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
access_key = "xxxxxxxxxxxxxxxxx"
secret_key = "xxxxxxxxxxxxxxxxx"
bucket = "s3a://seatunnel-test"
- type = "orc"
+ file_format_type = "orc"
}
```
@@ -281,7 +281,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
bucket = "s3a://seatunnel-test"
fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
- type = "json"
+ file_format_type = "json"
schema {
fields {
id = int
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index 1563874ae..fd27d4fb3 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
- [x] [column projection](../../concept/connector-v2-features.md)
- [x] [parallelism](../../concept/connector-v2-features.md)
- [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
- [x] text
- [x] csv
- [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
| user | string | yes | - |
| password | string | yes | - |
| path | string | yes | - |
-| type | string | yes | - |
+| file_format_type | string | yes | - |
| delimiter | string | no | \001 |
| parse_partition_from_path | boolean | no | true |
| date_format | string | no | yyyy-MM-dd |
@@ -138,7 +138,7 @@ The file type supported column projection as the following shown:
**Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured**
-### type [string]
+### file_format_type [string]
File type, supported as the following file types:
@@ -229,7 +229,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
port = 21
user = tyrantlucifer
password = tianchao
- type = "text"
+ file_format_type = "text"
schema = {
name = string
age = int
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 00dff82b0..2a64a6254 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -45,7 +45,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
HdfsSourceConfig.FILE_PATH.key(),
- HdfsSourceConfig.FILE_TYPE.key(),
+ HdfsSourceConfig.FILE_FORMAT_TYPE.key(),
HdfsSourceConfig.DEFAULT_FS.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
@@ -55,7 +55,8 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
readStrategy =
- ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
+ ReadStrategyFactory.of(
+ pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
@@ -87,7 +88,9 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
- pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
+ pluginConfig
+ .getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key())
+ .toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index a44b3d458..9a0ac6c67 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -76,11 +76,11 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
}
- if (config.hasPath(BaseSinkConfig.FILE_FORMAT.key())
- && !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT.key()))) {
+ if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
+ && !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
this.fileFormat =
FileFormat.valueOf(
- config.getString(BaseSinkConfig.FILE_FORMAT.key())
+ config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
.toUpperCase(Locale.ROOT));
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 43403f632..8e0decf73 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -179,11 +179,11 @@ public class BaseSinkConfig {
.withDescription(
"Only used when `custom_filename` is true. The time format of the path");
- public static final Option<FileFormat> FILE_FORMAT =
- Options.key("file_format")
+ public static final Option<FileFormat> FILE_FORMAT_TYPE =
+ Options.key("file_format_type")
.enumType(FileFormat.class)
.defaultValue(FileFormat.CSV)
- .withDescription("File format type");
+ .withDescription("File format type, e.g. csv, orc, parquet, text");
public static final Option<List<String>> SINK_COLUMNS =
Options.key("sink_columns")
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 747b972aa..43855399f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -27,11 +27,12 @@ import org.apache.seatunnel.format.text.constant.TextFormatConstant;
import java.util.List;
public class BaseSourceConfig {
- public static final Option<FileFormat> FILE_TYPE =
- Options.key("type")
+ public static final Option<FileFormat> FILE_FORMAT_TYPE =
+ Options.key("file_format_type")
.objectType(FileFormat.class)
.noDefaultValue()
- .withDescription("File type");
+ .withDescription(
+ "File format type, e.g. json, csv, text, parquet, orc, avro....");
public static final Option<String> FILE_PATH =
Options.key("path")
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 144a6e9ef..4b931cb89 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -144,7 +144,9 @@ public class TextReadStrategy extends AbstractReadStrategy {
} else {
FileFormat fileFormat =
FileFormat.valueOf(
- pluginConfig.getString(BaseSourceConfig.FILE_TYPE.key()).toUpperCase());
+ pluginConfig
+ .getString(BaseSourceConfig.FILE_FORMAT_TYPE.key())
+ .toUpperCase());
if (fileFormat == FileFormat.CSV) {
fieldDelimiter = ",";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index fa58c1d89..fad1d7506 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -42,21 +42,27 @@ public class FtpFileSinkFactory implements TableSinkFactory {
.required(FtpConfig.FTP_PORT)
.required(FtpConfig.FTP_USERNAME)
.required(FtpConfig.FTP_PASSWORD)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 017427f77..18e31dadf 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -54,7 +54,7 @@ public class FtpFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
FtpConfig.FILE_PATH.key(),
- FtpConfig.FILE_TYPE.key(),
+ FtpConfig.FILE_FORMAT_TYPE.key(),
FtpConfig.FTP_HOST.key(),
FtpConfig.FTP_PORT.key(),
FtpConfig.FTP_USERNAME.key(),
@@ -67,13 +67,15 @@ public class FtpFileSource extends BaseFileSource {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
FileFormat fileFormat =
- FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat.valueOf(
+ pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
throw new FileConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
"Ftp file source connector only support read [text, csv, json] files");
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(FtpConfig.FILE_PATH.key());
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 65ff449f4..5e8c34172 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -46,11 +46,13 @@ public class FtpFileSourceFactory implements TableSourceFactory {
.required(FtpConfig.FTP_PORT)
.required(FtpConfig.FTP_USERNAME)
.required(FtpConfig.FTP_PASSWORD)
- .required(FtpConfig.FILE_TYPE)
+ .required(FtpConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index 9a2064bd9..22e389403 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -39,21 +39,27 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
return OptionRule.builder()
.required(HdfsSourceConfig.DEFAULT_FS)
.required(BaseSinkConfig.FILE_PATH)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index e961bb137..ef2f8a54b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -43,11 +43,13 @@ public class HdfsFileSourceFactory implements TableSourceFactory {
return OptionRule.builder()
.required(HdfsSourceConfig.FILE_PATH)
.required(HdfsSourceConfig.DEFAULT_FS)
- .required(BaseSourceConfig.FILE_TYPE)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 41694bb8a..19a8d17ee 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -37,21 +37,27 @@ public class LocalFileSinkFactory implements TableSinkFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(BaseSinkConfig.FILE_PATH)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 673256c2a..01cb7dcb1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -57,7 +57,7 @@ public class LocalFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
LocalSourceConfig.FILE_PATH.key(),
- LocalSourceConfig.FILE_TYPE.key());
+ LocalSourceConfig.FILE_FORMAT_TYPE.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -66,7 +66,8 @@ public class LocalFileSource extends BaseFileSource {
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
readStrategy =
- ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
+ ReadStrategyFactory.of(
+ pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
@@ -80,7 +81,9 @@ public class LocalFileSource extends BaseFileSource {
// support user-defined schema
FileFormat fileFormat =
FileFormat.valueOf(
- pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
+ pluginConfig
+ .getString(LocalSourceConfig.FILE_FORMAT_TYPE.key())
+ .toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index ea960cba3..d6f982352 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -42,11 +42,13 @@ public class LocalFileSourceFactory implements TableSourceFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(LocalSourceConfig.FILE_PATH)
- .required(BaseSourceConfig.FILE_TYPE)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 655988131..5beb85921 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,21 +42,27 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 5fcc468ce..1f978e5b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -55,7 +55,7 @@ public class OssFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
OssConfig.FILE_PATH.key(),
- OssConfig.FILE_TYPE.key(),
+ OssConfig.FILE_FORMAT_TYPE.key(),
OssConfig.BUCKET.key(),
OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(),
@@ -67,7 +67,8 @@ public class OssFileSource extends BaseFileSource {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -80,7 +81,8 @@ public class OssFileSource extends BaseFileSource {
}
// support user-defined schema
FileFormat fileFormat =
- FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat.valueOf(
+ pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 103b8addb..c9b77de9d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,11 +46,13 @@ public class OssFileSourceFactory implements TableSourceFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .required(BaseSourceConfig.FILE_TYPE)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 655988131..5beb85921 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,21 +42,27 @@ public class OssFileSinkFactory implements TableSinkFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 1402d03fb..97d283460 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -54,7 +54,7 @@ public class OssFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
OssConfig.FILE_PATH.key(),
- OssConfig.FILE_TYPE.key(),
+ OssConfig.FILE_FORMAT_TYPE.key(),
OssConfig.BUCKET.key(),
OssConfig.ACCESS_KEY.key(),
OssConfig.ACCESS_SECRET.key(),
@@ -66,7 +66,8 @@ public class OssFileSource extends BaseFileSource {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -79,7 +80,8 @@ public class OssFileSource extends BaseFileSource {
}
// support user-defined schema
FileFormat fileFormat =
- FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+ FileFormat.valueOf(
+ pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 103b8addb..c9b77de9d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,11 +46,13 @@ public class OssFileSourceFactory implements TableSourceFactory {
.required(OssConfig.ACCESS_KEY)
.required(OssConfig.ACCESS_SECRET)
.required(OssConfig.ENDPOINT)
- .required(BaseSourceConfig.FILE_TYPE)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
index 9b16b3419..816597f05 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -73,10 +73,10 @@ public class S3Catalog implements Catalog {
@Override
public void open() throws CatalogException {
ReadStrategy readStrategy =
- ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(s3Config);
this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
- readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+ readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(s3Config);
try {
fileSystem =
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 685068cc9..86a08931a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -47,21 +47,27 @@ public class S3FileSinkFactory implements TableSinkFactory {
S3Config.S3_ACCESS_KEY,
S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 0136b6a7d..fea806c46 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -54,7 +54,7 @@ public class S3FileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
S3Config.FILE_PATH.key(),
- S3Config.FILE_TYPE.key(),
+ S3Config.FILE_FORMAT_TYPE.key(),
S3Config.S3_BUCKET.key());
if (!result.isSuccess()) {
throw new FileConnectorException(
@@ -63,7 +63,8 @@ public class S3FileSource extends BaseFileSource {
"PluginName: %s, PluginType: %s, Message: %s",
getPluginName(), PluginType.SOURCE, result.getMsg()));
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(S3Config.FILE_PATH.key());
hadoopConf = S3Conf.buildWithConfig(pluginConfig);
@@ -76,7 +77,8 @@ public class S3FileSource extends BaseFileSource {
}
// support user-defined schema
FileFormat fileFormat =
- FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
+ FileFormat.valueOf(
+ pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()).toUpperCase());
// only json text csv type support user-defined schema now
if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 165d0d741..51a5e536f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -42,7 +42,7 @@ public class S3FileSourceFactory implements TableSourceFactory {
public OptionRule optionRule() {
return OptionRule.builder()
.required(S3Config.FILE_PATH)
- .required(S3Config.FILE_TYPE)
+ .required(S3Config.FILE_FORMAT_TYPE)
.required(S3Config.S3_BUCKET)
.required(S3Config.FS_S3A_ENDPOINT)
.required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
@@ -53,9 +53,11 @@ public class S3FileSourceFactory implements TableSourceFactory {
S3Config.S3_SECRET_KEY)
.optional(S3Config.S3_PROPERTIES)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index ac96f89de..8cad73445 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -42,21 +42,27 @@ public class SftpFileSinkFactory implements TableSinkFactory {
.required(SftpConfig.SFTP_PORT)
.required(SftpConfig.SFTP_USERNAME)
.required(SftpConfig.SFTP_PASSWORD)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.TEXT,
BaseSinkConfig.ROW_DELIMITER,
BaseSinkConfig.FIELD_DELIMITER,
BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.CSV,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.JSON,
+ BaseSinkConfig.TXT_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+ BaseSinkConfig.FILE_FORMAT_TYPE,
+ FileFormat.ORC,
+ BaseSinkConfig.ORC_COMPRESS)
.conditional(
- BaseSinkConfig.FILE_FORMAT,
+ BaseSinkConfig.FILE_FORMAT_TYPE,
FileFormat.PARQUET,
BaseSinkConfig.PARQUET_COMPRESS)
.optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index 306835cb0..28a06d17a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -54,7 +54,7 @@ public class SftpFileSource extends BaseFileSource {
CheckConfigUtil.checkAllExists(
pluginConfig,
SftpConfig.FILE_PATH.key(),
- SftpConfig.FILE_TYPE.key(),
+ SftpConfig.FILE_FORMAT_TYPE.key(),
SftpConfig.SFTP_HOST.key(),
SftpConfig.SFTP_PORT.key(),
SftpConfig.SFTP_USERNAME.key(),
@@ -68,13 +68,14 @@ public class SftpFileSource extends BaseFileSource {
}
FileFormat fileFormat =
FileFormat.valueOf(
- pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
+ pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
throw new FileConnectorException(
CommonErrorCode.ILLEGAL_ARGUMENT,
"Sftp file source connector only support read [text, csv, json] files");
}
- readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
+ readStrategy =
+ ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()));
readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(SftpConfig.FILE_PATH.key());
hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index dcec3f343..32ecfa392 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -46,11 +46,13 @@ public class SftpFileSourceFactory implements TableSourceFactory {
.required(SftpConfig.SFTP_PORT)
.required(SftpConfig.SFTP_USERNAME)
.required(SftpConfig.SFTP_PASSWORD)
- .required(BaseSourceConfig.FILE_TYPE)
+ .required(BaseSourceConfig.FILE_FORMAT_TYPE)
.conditional(
- BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+ BaseSourceConfig.FILE_FORMAT_TYPE,
+ FileFormat.TEXT,
+ BaseSourceConfig.DELIMITER)
.conditional(
- BaseSourceConfig.FILE_TYPE,
+ BaseSourceConfig.FILE_FORMAT_TYPE,
Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
CatalogTableUtil.SCHEMA)
.optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index b773bc4bd..19068fcf3 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,9 +17,6 @@
package org.apache.seatunnel.connectors.seatunnel.hive.sink;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -55,6 +52,7 @@ import java.util.stream.Collectors;
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
@@ -140,7 +138,7 @@ public class HiveSink extends BaseHdfsFileSink {
pluginConfig =
pluginConfig
.withValue(
- FILE_FORMAT.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
.withValue(
FIELD_DELIMITER.key(),
@@ -151,12 +149,12 @@ public class HiveSink extends BaseHdfsFileSink {
} else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
pluginConfig =
pluginConfig.withValue(
- FILE_FORMAT.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
pluginConfig =
pluginConfig.withValue(
- FILE_FORMAT.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
} else {
throw new HiveConnectorException(
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 5f2e27424..8ec5d2d3f 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -126,7 +126,7 @@ public class HiveSource extends BaseHdfsFileSource {
if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- FILE_TYPE.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
// Build schema from hive table information
// Because the entrySet in typesafe config couldn't keep key-value order
@@ -140,12 +140,12 @@ public class HiveSource extends BaseHdfsFileSource {
} else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- FILE_TYPE.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
} else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
pluginConfig =
pluginConfig.withValue(
- FILE_TYPE.key(),
+ FILE_FORMAT_TYPE.key(),
ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
} else {
throw new HiveConnectorException(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index 44b875f6b..2c11040fd 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
@@ -71,7 +89,7 @@ public class RedshiftDataTypeConvertor implements DataTypeConvertor<String> {
private static final String REDSHIFT_TIMETZ = "timetz";
private static final String REDSHIFT_TIMESTAMP = "timestamp";
private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE =
- "timestamp without time zone";
+ "timestamp without time zone";
private static final String REDSHIFT_TIMESTAMPTZ = "timestamptz";
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
index 2a3a7d66c..0211cadbe 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
@@ -46,7 +46,7 @@ public class S3RedshiftFactory implements TableSinkFactory {
S3RedshiftConfig.EXECUTE_SQL,
BaseSourceConfig.FILE_PATH)
.optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
- .optional(BaseSinkConfig.FILE_FORMAT)
+ .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
.optional(BaseSinkConfig.FILENAME_TIME_FORMAT)
.optional(BaseSinkConfig.FIELD_DELIMITER)
.optional(BaseSinkConfig.ROW_DELIMITER)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
index 9d48af1cf..9f58e0f22 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
@@ -72,7 +72,7 @@ sink {
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
- file_format = "json"
+ file_format_type = "json"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
index ba94dce76..19f7b95a8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
@@ -28,7 +28,7 @@ env {
source {
LocalFile {
path = "/seatunnel/read/json"
- type = "json"
+ file_format_type = "json"
schema = {
fields {
c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
index a134e9fc0..3490fb169 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
@@ -72,7 +72,7 @@ sink {
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
- file_format = "orc"
+ file_format_type = "orc"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "zlib"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
index 7549af6fd..77a9c3a58 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
@@ -27,8 +27,8 @@ env {
source {
LocalFile {
- path = "/seatunnel/read/orc"
- type = "orc"
+ path = "/seatunnel/read/parquet"
+ file_format_type = "parquet"
result_table_name = "fake"
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
index c3eae33b0..a915229f5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
@@ -72,7 +72,7 @@ sink {
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
- file_format = "parquet"
+ file_format_type = "parquet"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "gzip"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
index d99fef31a..07776f0b8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
@@ -28,7 +28,7 @@ env {
source {
LocalFile {
path = "/seatunnel/read/parquet"
- type = "parquet"
+ file_format_type = "parquet"
result_table_name = "fake"
}
}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
index 795d82234..294c4538e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
@@ -72,7 +72,7 @@ sink {
partition_dir_expression = "${k0}=${v0}"
is_partition_field_write_in_file = true
file_name_expression = "${transactionId}_${now}"
- file_format = "text"
+ file_format_type = "text"
filename_time_format = "yyyy.MM.dd"
is_enable_transaction = true
compress_codec = "lzo"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
index c2b6e0402..7ebeba63a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
@@ -28,7 +28,7 @@ env {
source {
LocalFile {
path = "/seatunnel/read/text"
- type = "text"
+ file_format_type = "text"
skip_header_row_number = 1
schema = {
fields {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
index 11c0a5710..05ce1ee6e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
@@ -28,7 +28,7 @@ env {
source {
LocalFile {
path = "/seatunnel/read/text"
- type = "text"
+ file_format_type = "text"
schema = {
fields {
c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 10c2b673e..78ea7db94 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -86,7 +86,7 @@ sink {
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 2a327e463..5bbd38ada 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -132,7 +132,7 @@ sink {
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 1c3bd3ebe..f469c961f 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -84,7 +84,7 @@ sink {
field_delimiter="\t"
row_delimiter="\n"
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
index ef1ce902a..4d7f65d53 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
@@ -146,7 +146,7 @@ sink {
field_delimiter="\t"
row_delimiter="\n"
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 19ecb9359..96daf1365 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -132,7 +132,7 @@ sink {
partition_dir_expression="${k0}=${v0}"
is_partition_field_write_in_file=true
file_name_expression="${transactionId}_${now}"
- file_format="text"
+ file_format_type="text"
filename_time_format="yyyy.MM.dd"
is_enable_transaction=true
save_mode="error",