You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by we...@apache.org on 2023/03/03 07:57:48 UTC

[incubator-seatunnel] branch cdc-multiple-table updated: Change file type to file_format_type in file source/sink (#4249)

This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/cdc-multiple-table by this push:
     new 0a10c40de Change file type to file_format_type in file source/sink (#4249)
0a10c40de is described below

commit 0a10c40de4daa95ffd411cdcff112d7f0026c1d2
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Mar 3 15:57:42 2023 +0800

    Change file type to file_format_type in file source/sink (#4249)
---
 docs/en/connector-v2/sink/HdfsFile.md                  | 10 +++++-----
 docs/en/connector-v2/sink/OssFile.md                   | 12 ++++++------
 docs/en/connector-v2/sink/OssJindoFile.md              | 12 ++++++------
 docs/en/connector-v2/sink/S3-Redshift.md               | 12 ++++++------
 docs/en/connector-v2/sink/S3File.md                    | 12 ++++++------
 docs/en/connector-v2/sink/SftpFile.md                  |  8 ++++----
 docs/en/connector-v2/source/FtpFile.md                 |  8 ++++----
 docs/en/connector-v2/source/HdfsFile.md                |  8 ++++----
 docs/en/connector-v2/source/LocalFile.md               | 10 +++++-----
 docs/en/connector-v2/source/OssFile.md                 | 10 +++++-----
 docs/en/connector-v2/source/OssJindoFile.md            | 10 +++++-----
 docs/en/connector-v2/source/S3File.md                  | 10 +++++-----
 docs/en/connector-v2/source/SftpFile.md                |  8 ++++----
 .../seatunnel/file/hdfs/source/BaseHdfsFileSource.java |  6 +++---
 .../seatunnel/file/config/BaseFileSinkConfig.java      |  6 +++---
 .../seatunnel/file/config/BaseSinkConfig.java          |  4 ++--
 .../seatunnel/file/config/BaseSourceConfig.java        |  4 ++--
 .../seatunnel/file/source/reader/TextReadStrategy.java |  2 +-
 .../seatunnel/file/ftp/sink/FtpFileSinkFactory.java    | 12 ++++++------
 .../seatunnel/file/ftp/source/FtpFileSource.java       |  6 +++---
 .../file/ftp/source/FtpFileSourceFactory.java          |  6 +++---
 .../seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java  | 12 ++++++------
 .../file/hdfs/source/HdfsFileSourceFactory.java        |  6 +++---
 .../file/local/sink/LocalFileSinkFactory.java          | 12 ++++++------
 .../seatunnel/file/local/source/LocalFileSource.java   |  6 +++---
 .../file/local/source/LocalFileSourceFactory.java      |  6 +++---
 .../seatunnel/file/oss/sink/OssFileSinkFactory.java    | 12 ++++++------
 .../seatunnel/file/oss/source/OssFileSource.java       |  6 +++---
 .../file/oss/source/OssFileSourceFactory.java          |  6 +++---
 .../seatunnel/file/oss/sink/OssFileSinkFactory.java    | 12 ++++++------
 .../seatunnel/file/oss/source/OssFileSource.java       |  6 +++---
 .../file/oss/source/OssFileSourceFactory.java          |  6 +++---
 .../seatunnel/file/s3/catalog/S3Catalog.java           |  4 ++--
 .../seatunnel/file/s3/catalog/S3DataTypeConvertor.java |  1 -
 .../seatunnel/file/s3/sink/S3FileSinkFactory.java      | 12 ++++++------
 .../seatunnel/file/s3/source/S3FileSource.java         |  6 +++---
 .../seatunnel/file/s3/source/S3FileSourceFactory.java  |  6 +++---
 .../seatunnel/file/sftp/sink/SftpFileSinkFactory.java  | 12 ++++++------
 .../seatunnel/file/sftp/source/SftpFileSource.java     |  6 +++---
 .../file/sftp/source/SftpFileSourceFactory.java        |  6 +++---
 .../connectors/seatunnel/hive/sink/HiveSink.java       |  8 ++++----
 .../connectors/seatunnel/hive/source/HiveSource.java   |  6 +++---
 .../catalog/redshift/RedshiftDataTypeConvertor.java    | 18 ++++++++++++++++++
 .../seatunnel/redshift/sink/S3RedshiftFactory.java     |  2 +-
 44 files changed, 185 insertions(+), 168 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index dfb115ec4..0e7589fa4 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -40,7 +40,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -95,7 +95,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -196,7 +196,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
 HdfsFile {
     fs.defaultFS = "hdfs://hadoopcluster"
     path = "/tmp/hive/warehouse/test2"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -226,7 +226,7 @@ HdfsFile {
     custom_filename = true
     file_name_expression = "${transactionId}_${now}"
     filename_time_format = "yyyy.MM.dd"
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
     is_enable_transaction = true
 }
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index 5cef80d16..3ccdcf41e 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -102,7 +102,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -187,7 +187,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -217,7 +217,7 @@ For parquet file format with `have_partition` and `sink_columns`
     partition_by = ["age"]
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
   }
 
@@ -233,7 +233,7 @@ For orc file format simple config
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 4c0ccb9a0..23f02c7b7 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -102,7 +102,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -187,7 +187,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -213,7 +213,7 @@ For parquet file format with `sink_columns`
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
   }
 
@@ -229,7 +229,7 @@ For orc file format simple config
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/S3-Redshift.md b/docs/en/connector-v2/sink/S3-Redshift.md
index 7a519b770..41eeb514a 100644
--- a/docs/en/connector-v2/sink/S3-Redshift.md
+++ b/docs/en/connector-v2/sink/S3-Redshift.md
@@ -18,7 +18,7 @@ Output data to AWS Redshift.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | access_secret                    | string  | no       | -                                                         |
 | hadoop_s3_properties             | map     | no       | -                                                         |
 | file_name_expression             | string  | no       | "${transactionId}"                                        |
-| file_format                      | string  | no       | "text"                                                    |
+| file_format_type                 | string  | no       | "text"                                                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
 | field_delimiter                  | string  | no       | '\001'                                                    |
 | row_delimiter                    | string  | no       | "\n"                                                      |
@@ -117,7 +117,7 @@ If you need to add a other option, you could add it here and refer to this [Hado
 
 Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -205,7 +205,7 @@ For text file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type = "text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
@@ -233,7 +233,7 @@ For parquet file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
+    file_format_type = "parquet"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
@@ -261,7 +261,7 @@ For orc file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="orc"
+    file_format_type = "orc"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index 76ce51342..fbeae0bee 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -22,7 +22,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                                 | Whether you need custom the filename                                                                   |
 | file_name_expression             | string  | no       | "${transactionId}"                                    | Only used when custom_filename is true                                                                 |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                                          | Only used when custom_filename is true                                                                 |
-| file_format                      | string  | no       | "csv"                                                 |                                                                                                        |
+| file_format_type                 | string  | no       | "csv"                                                 |                                                                                                        |
 | field_delimiter                  | string  | no       | '\001'                                                | Only used when file_format is text                                                                     |
 | row_delimiter                    | string  | no       | "\n"                                                  | Only used when file_format is text                                                                     |
 | have_partition                   | boolean | no       | false                                                 | Whether you need processing partitions.                                                                |
@@ -116,7 +116,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -201,7 +201,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     path="/seatunnel/text"
     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
     fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
-    file_format="text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -233,7 +233,7 @@ For parquet file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCr
     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
-    file_format="parquet"
+    file_format_type = "parquet"
     hadoop_s3_properties {
       "fs.s3a.buffer.dir" = "/data/st_test/s3a"
       "fs.s3a.fast.upload.buffer" = "disk"
@@ -254,7 +254,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
-    file_format="orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index 4879cde7b..6a807e677 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -99,7 +99,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -184,7 +184,7 @@ SftpFile {
     username = "username"
     password = "password"
     path = "/data/sftp"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 6f774e79f..22006ec45 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 | user                      | string  | yes      | -                   |
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -124,7 +124,7 @@ then Seatunnel will skip the first 2 lines from source files
 
 The schema information of upstream data.
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -216,7 +216,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     port = 21
     user = tyrantlucifer
     password = tianchao
-    type = "text"
+    file_format_type = "text"
     schema = {
       name = string
       age = int
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 240f89146..601e487d0 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format file
   - [x] text
   - [x] csv
   - [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | name                      | type    | required | default value       |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | fs.defaultFS              | string  | yes      | -                   |
 | hdfs_site_path            | string  | no       | -                   |
 | delimiter                 | string  | no       | \001                |
@@ -109,7 +109,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -230,7 +230,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 
 HdfsFile {
   path = "/apps/hive/demo/student"
-  type = "parquet"
+  file_format_type = "parquet"
   fs.defaultFS = "hdfs://namenode001"
 }
 
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 33acf7c90..7aa65360d 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | name                      | type    | required | default value       |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type                      | string  | yes      | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -105,7 +105,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -209,7 +209,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 
 LocalFile {
   path = "/apps/hive/demo/student"
-  type = "parquet"
+  file_format_type = "parquet"
 }
 
 ```
@@ -224,7 +224,7 @@ LocalFile {
     }
   }
   path = "/apps/hive/demo/student"
-  type = "json"
+  file_format_type = "json"
 }
 
 ```
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 038080961..8f619c0e8 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | name                      | type    | required | default value       |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type                      | string  | yes      | -                   |
 | bucket                    | string  | yes      | -                   |
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
@@ -112,7 +112,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -236,7 +236,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -249,7 +249,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index b2ecd6d96..ada030e14 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | name                      | type    | required | default value       |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type                      | string  | yes      | -                   |
 | bucket                    | string  | yes      | -                   |
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
@@ -112,7 +112,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -236,7 +236,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -249,7 +249,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index 15d0dbd9f..a423b7e8f 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -27,7 +27,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -39,7 +39,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 | name                            | type    | required | default value                                         |
 |---------------------------------|---------|----------|-------------------------------------------------------|
 | path                            | string  | yes      | -                                                     |
-| type                            | string  | yes      | -                                                     |
+| file_format_type                            | string  | yes      | -                                                     |
 | bucket                          | string  | yes      | -                                                     |
 | fs.s3a.endpoint                 | string  | yes      | -                                                     |
 | fs.s3a.aws.credentials.provider | string  | yes      | com.amazonaws.auth.InstanceProfileCredentialsProvider |
@@ -121,7 +121,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
     bucket = "s3a://seatunnel-test"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -263,7 +263,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     bucket = "s3a://seatunnel-test"
     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
     fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index 8108894f4..9501941d7 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
     - [x] text
     - [x] csv
     - [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 | user                      | string  | yes      | -                   |
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type                      | string  | yes      | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -124,7 +124,7 @@ then Seatunnel will skip the first 2 lines from source files
 
 The schema information of upstream data.
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -216,7 +216,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     port = 21
     user = tyrantlucifer
     password = tianchao
-    type = "text"
+    file_format_type = "text"
     schema = {
       name = string
       age = int
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 98500d620..e47cfd4cc 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -42,13 +42,13 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, HdfsSourceConfig.FILE_PATH.key(),
-                HdfsSourceConfig.FILE_TYPE.key(), HdfsSourceConfig.DEFAULT_FS.key());
+                HdfsSourceConfig.FILE_FORMAT_TYPE.key(), HdfsSourceConfig.DEFAULT_FS.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
         hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
@@ -68,7 +68,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
             throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
         // support user-defined schema
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index c38f43ae9..b57e94b03 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -73,9 +73,9 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
             this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
         }
 
-        if (config.hasPath(BaseSinkConfig.FILE_FORMAT.key()) &&
-                !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT.key()))) {
-            this.fileFormat = FileFormat.valueOf(config.getString(BaseSinkConfig.FILE_FORMAT.key()).toUpperCase(Locale.ROOT));
+        if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key()) &&
+                !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
+            this.fileFormat = FileFormat.valueOf(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()).toUpperCase(Locale.ROOT));
         }
 
         if (config.hasPath(BaseSinkConfig.DATE_FORMAT.key())) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 572a49fd3..0e0851744 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -138,10 +138,10 @@ public class BaseSinkConfig {
         .defaultValue(DateUtils.Formatter.YYYY_MM_DD_SPOT.getValue())
         .withDescription("Only used when `custom_filename` is true. The time format of the path");
 
-    public static final Option<FileFormat> FILE_FORMAT = Options.key("file_format")
+    public static final Option<FileFormat> FILE_FORMAT_TYPE = Options.key("file_format_type")
             .enumType(FileFormat.class)
             .defaultValue(FileFormat.CSV)
-            .withDescription("File format type");
+            .withDescription("File format type, e.g. csv, orc, parquet, text");
 
     public static final Option<List<String>> SINK_COLUMNS = Options.key("sink_columns")
             .listType()
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 9ddf90f8a..9d9942123 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -26,10 +26,10 @@ import org.apache.seatunnel.common.utils.TimeUtils;
 import java.util.List;
 
 public class BaseSourceConfig {
-    public static final Option<FileFormat> FILE_TYPE = Options.key("type")
+    public static final Option<FileFormat> FILE_FORMAT_TYPE = Options.key("file_format_type")
             .objectType(FileFormat.class)
             .noDefaultValue()
-            .withDescription("File type");
+            .withDescription("File format type, e.g. json, csv, text, parquet, orc, avro....");
 
     public static final Option<String> FILE_PATH = Options.key("path")
             .stringType()
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index e8df4655d..a401a8e43 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -99,7 +99,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
         if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER.key())) {
             fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER.key());
         } else {
-            FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE.key()).toUpperCase());
+            FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
             if (fileFormat == FileFormat.CSV) {
                 fieldDelimiter = ",";
             }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index 35e611ced..b26c29f62 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -42,13 +42,13 @@ public class FtpFileSinkFactory implements TableSinkFactory {
             .required(FtpConfig.FTP_PORT)
             .required(FtpConfig.FTP_USERNAME)
             .required(FtpConfig.FTP_PASSWORD)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index ddcd2d36d..b19322c08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -51,19 +51,19 @@ public class FtpFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                FtpConfig.FILE_PATH.key(), FtpConfig.FILE_TYPE.key(),
+                FtpConfig.FILE_PATH.key(), FtpConfig.FILE_FORMAT_TYPE.key(),
                 FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
                 FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));        }
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
             throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
                     "Ftp file source connector only support read [text, csv, json] files");
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(FtpConfig.FILE_PATH.key());
         hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index c67b55cfa..1e011fae2 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -46,9 +46,9 @@ public class FtpFileSourceFactory implements TableSourceFactory {
             .required(FtpConfig.FTP_PORT)
             .required(FtpConfig.FTP_USERNAME)
             .required(FtpConfig.FTP_PASSWORD)
-            .required(FtpConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(FtpConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index 68fc6e749..e30611a08 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -39,13 +39,13 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
             .required(HdfsSourceConfig.DEFAULT_FS)
             .required(BaseSinkConfig.FILE_PATH)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index 393c5c487..11aaa71d7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -43,9 +43,9 @@ public class HdfsFileSourceFactory implements TableSourceFactory {
         return OptionRule.builder()
             .required(HdfsSourceConfig.FILE_PATH)
             .required(HdfsSourceConfig.DEFAULT_FS)
-            .required(BaseSourceConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index bf7390c16..6c84323b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -37,13 +37,13 @@ public class LocalFileSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(BaseSinkConfig.FILE_PATH)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 7ce2777b0..1c20a0811 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -53,13 +53,13 @@ public class LocalFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, LocalSourceConfig.FILE_PATH.key(),
-                LocalSourceConfig.FILE_TYPE.key());
+                LocalSourceConfig.FILE_FORMAT_TYPE.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
         hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
@@ -70,7 +70,7 @@ public class LocalFileSource extends BaseFileSource {
             throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
         // support user-defined schema
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index 69f515bb1..3b982e276 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -42,9 +42,9 @@ public class LocalFileSourceFactory implements TableSourceFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(LocalSourceConfig.FILE_PATH)
-            .required(BaseSourceConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 8495d95b9..f44376f51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,13 +42,13 @@ public class OssFileSinkFactory implements TableSinkFactory {
             .required(OssConfig.ACCESS_KEY)
             .required(OssConfig.ACCESS_SECRET)
             .required(OssConfig.ENDPOINT)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 8c974ac67..dc52bcc7a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -52,7 +52,7 @@ public class OssFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(),
+                OssConfig.FILE_PATH.key(), OssConfig.FILE_FORMAT_TYPE.key(),
                 OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
                 OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
         if (!result.isSuccess()) {
@@ -60,7 +60,7 @@ public class OssFileSource extends BaseFileSource {
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -71,7 +71,7 @@ public class OssFileSource extends BaseFileSource {
             throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
         // support user-defined schema
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 721b55f0f..a9b7390f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,9 +46,9 @@ public class OssFileSourceFactory implements TableSourceFactory {
             .required(OssConfig.ACCESS_KEY)
             .required(OssConfig.ACCESS_SECRET)
             .required(OssConfig.ENDPOINT)
-            .required(BaseSourceConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 8495d95b9..f44376f51 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,13 +42,13 @@ public class OssFileSinkFactory implements TableSinkFactory {
             .required(OssConfig.ACCESS_KEY)
             .required(OssConfig.ACCESS_SECRET)
             .required(OssConfig.ENDPOINT)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 83d981996..cc9c80f3c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -51,7 +51,7 @@ public class OssFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                OssConfig.FILE_PATH.key(), OssConfig.FILE_TYPE.key(),
+                OssConfig.FILE_PATH.key(), OssConfig.FILE_FORMAT_TYPE.key(),
                 OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
                 OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
         if (!result.isSuccess()) {
@@ -59,7 +59,7 @@ public class OssFileSource extends BaseFileSource {
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -70,7 +70,7 @@ public class OssFileSource extends BaseFileSource {
             throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
         // support user-defined schema
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 721b55f0f..a9b7390f6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,9 +46,9 @@ public class OssFileSourceFactory implements TableSourceFactory {
             .required(OssConfig.ACCESS_KEY)
             .required(OssConfig.ACCESS_SECRET)
             .required(OssConfig.ENDPOINT)
-            .required(BaseSourceConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
index b8cc5aa00..fdeb41b7e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -69,10 +69,10 @@ public class S3Catalog implements Catalog {
 
     @Override
     public void open() throws CatalogException {
-        ReadStrategy readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+        ReadStrategy readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(s3Config);
         this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
-        readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(s3Config);
         try {
             fileSystem = FileSystem.get(readStrategy.getConfiguration(S3Conf.buildWithConfig(s3Config)));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
index 4c245d161..9c2827f15 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3DataTypeConvertor.java
@@ -25,7 +25,6 @@ import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertor;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 
 import com.google.auto.service.AutoService;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 3a0a95b41..b9c598b89 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -43,13 +43,13 @@ public class S3FileSinkFactory implements TableSinkFactory {
                 .required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
                 .conditional(S3Config.S3A_AWS_CREDENTIALS_PROVIDER, S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
                 .optional(S3Config.S3_PROPERTIES)
-                .optional(BaseSinkConfig.FILE_FORMAT)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+                .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                     BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-                .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
                 .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION, BaseSinkConfig.FILENAME_TIME_FORMAT)
                 .optional(BaseSinkConfig.HAVE_PARTITION)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 12f0afbfc..ce93542e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -51,13 +51,13 @@ public class S3FileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                S3Config.FILE_PATH.key(), S3Config.FILE_TYPE.key(), S3Config.S3_BUCKET.key());
+                S3Config.FILE_PATH.key(), S3Config.FILE_FORMAT_TYPE.key(), S3Config.S3_BUCKET.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(S3Config.FILE_PATH.key());
         hadoopConf = S3Conf.buildWithConfig(pluginConfig);
@@ -68,7 +68,7 @@ public class S3FileSource extends BaseFileSource {
             throw new FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, e);
         }
         // support user-defined schema
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index d2f9d429f..78800f0cb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -42,7 +42,7 @@ public class S3FileSourceFactory implements TableSourceFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
             .required(S3Config.FILE_PATH)
-            .required(S3Config.FILE_TYPE)
+            .required(S3Config.FILE_FORMAT_TYPE)
             .required(S3Config.S3_BUCKET)
             .required(S3Config.FS_S3A_ENDPOINT)
             .required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
@@ -50,8 +50,8 @@ public class S3FileSourceFactory implements TableSourceFactory {
                 S3Config.S3aAwsCredentialsProvider.SimpleAWSCredentialsProvider, S3Config.S3_ACCESS_KEY,
                 S3Config.S3_SECRET_KEY)
             .optional(S3Config.S3_PROPERTIES)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index bc61079fe..5bf8bcf70 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -42,13 +42,13 @@ public class SftpFileSinkFactory implements TableSinkFactory {
             .required(SftpConfig.SFTP_PORT)
             .required(SftpConfig.SFTP_USERNAME)
             .required(SftpConfig.SFTP_PASSWORD)
-            .optional(BaseSinkConfig.FILE_FORMAT)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSinkConfig.ROW_DELIMITER,
                 BaseSinkConfig.FIELD_DELIMITER, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
-            .conditional(BaseSinkConfig.FILE_FORMAT, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+            .conditional(BaseSinkConfig.FILE_FORMAT_TYPE, FileFormat.PARQUET, BaseSinkConfig.PARQUET_COMPRESS)
             .optional(BaseSinkConfig.CUSTOM_FILENAME)
             .conditional(BaseSinkConfig.CUSTOM_FILENAME, true, BaseSinkConfig.FILE_NAME_EXPRESSION,
                 BaseSinkConfig.FILENAME_TIME_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index b4d2ab23e..359f3b384 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -51,7 +51,7 @@ public class SftpFileSource extends BaseFileSource {
     @Override
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
-                SftpConfig.FILE_PATH.key(), SftpConfig.FILE_TYPE.key(),
+                SftpConfig.FILE_PATH.key(), SftpConfig.FILE_FORMAT_TYPE.key(),
                 SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
                 SftpConfig.SFTP_USERNAME.key(), SftpConfig.SFTP_PASSWORD.key());
         if (!result.isSuccess()) {
@@ -59,12 +59,12 @@ public class SftpFileSource extends BaseFileSource {
                     String.format("PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
             throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
                     "Sftp file source connector only support read [text, csv, json] files");
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(SftpConfig.FILE_PATH.key());
         hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index fa434d685..d80b80aae 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -46,9 +46,9 @@ public class SftpFileSourceFactory implements TableSourceFactory {
             .required(SftpConfig.SFTP_PORT)
             .required(SftpConfig.SFTP_USERNAME)
             .required(SftpConfig.SFTP_PASSWORD)
-            .required(BaseSourceConfig.FILE_TYPE)
-            .conditional(BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
-            .conditional(BaseSourceConfig.FILE_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
+            .required(BaseSourceConfig.FILE_FORMAT_TYPE)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+            .conditional(BaseSourceConfig.FILE_FORMAT_TYPE, Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                 CatalogTableUtil.SCHEMA)
             .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
             .optional(BaseSourceConfig.DATE_FORMAT)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 09a6d85d7..f90db5b83 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -18,7 +18,7 @@
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
-import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.IS_PARTITION_FIELD_WRITE_IN_FILE;
@@ -103,13 +103,13 @@ public class HiveSink extends BaseHdfsFileSink {
         String outputFormat = tableInformation.getSd().getOutputFormat();
         if (TEXT_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             Map<String, String> parameters = tableInformation.getSd().getSerdeInfo().getParameters();
-            pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
                 .withValue(FIELD_DELIMITER.key(), ConfigValueFactory.fromAnyRef(parameters.get("field.delim")))
                 .withValue(ROW_DELIMITER.key(), ConfigValueFactory.fromAnyRef(parameters.get("line.delim")));
         } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
-            pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
-            pluginConfig = pluginConfig.withValue(FILE_FORMAT.key(), ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
+            pluginConfig = pluginConfig.withValue(FILE_FORMAT_TYPE.key(), ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
         } else {
             throw new HiveConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
                     "Hive connector only support [text parquet orc] table now");
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 7c58e3c47..378cee682 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -89,13 +89,13 @@ public class HiveSource extends BaseHdfsFileSource {
         tableInformation = tableInfo.getRight();
         String inputFormat = tableInformation.getSd().getInputFormat();
         if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
                     ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
         } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
                     ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
-            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_TYPE.key(),
+            pluginConfig = pluginConfig.withValue(BaseSourceConfig.FILE_FORMAT_TYPE.key(),
                     ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
         } else {
             throw new HiveConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index af463ce03..4b9f11cab 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
 
 import static com.google.common.base.Preconditions.checkNotNull;
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
index 696fd03a3..b66421687 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
@@ -40,7 +40,7 @@ public class S3RedshiftFactory implements TableSinkFactory {
         return OptionRule.builder()
             .required(S3Config.S3_BUCKET, S3RedshiftConfig.JDBC_URL, S3RedshiftConfig.JDBC_USER, S3RedshiftConfig.JDBC_PASSWORD, S3RedshiftConfig.EXECUTE_SQL, BaseSourceConfig.FILE_PATH)
             .optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
-            .optional(BaseSinkConfig.FILE_FORMAT)
+            .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
             .optional(BaseSinkConfig.FILENAME_TIME_FORMAT)
             .optional(BaseSinkConfig.FIELD_DELIMITER)
             .optional(BaseSinkConfig.ROW_DELIMITER)