You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2023/03/04 03:27:46 UTC

[incubator-seatunnel] 04/04: Change file type to file_format_type in file source/sink (#4249)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch cdc-multiple-table
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git

commit 0b8fb1a4aa6efef21c5e54ece5c78fb062abeb00
Author: Wenjun Ruan <we...@apache.org>
AuthorDate: Fri Mar 3 15:57:42 2023 +0800

    Change file type to file_format_type in file source/sink (#4249)
    
    # Conflicts:
    #       docs/en/connector-v2/sink/OssFile.md
    #       docs/en/connector-v2/sink/OssJindoFile.md
    #       docs/en/connector-v2/sink/S3-Redshift.md
    #       docs/en/connector-v2/sink/SftpFile.md
    #       docs/en/connector-v2/source/FtpFile.md
    #       docs/en/connector-v2/source/LocalFile.md
    #       docs/en/connector-v2/source/SftpFile.md
    #       seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
    #       seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
    #       seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
    #       seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
    #       seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
    #       seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
    #       seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
    #       seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
    #       seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
    #       seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
    #       seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
---
 docs/en/connector-v2/sink/HdfsFile.md                | 10 +++++-----
 docs/en/connector-v2/sink/OssFile.md                 | 12 ++++++------
 docs/en/connector-v2/sink/OssJindoFile.md            | 12 ++++++------
 docs/en/connector-v2/sink/S3-Redshift.md             | 12 ++++++------
 docs/en/connector-v2/sink/S3File.md                  | 12 ++++++------
 docs/en/connector-v2/sink/SftpFile.md                |  8 ++++----
 docs/en/connector-v2/source/FtpFile.md               |  8 ++++----
 docs/en/connector-v2/source/HdfsFile.md              |  8 ++++----
 docs/en/connector-v2/source/LocalFile.md             | 10 +++++-----
 docs/en/connector-v2/source/OssFile.md               | 10 +++++-----
 docs/en/connector-v2/source/OssJindoFile.md          | 10 +++++-----
 docs/en/connector-v2/source/S3File.md                | 10 +++++-----
 docs/en/connector-v2/source/SftpFile.md              |  8 ++++----
 .../file/hdfs/source/BaseHdfsFileSource.java         |  9 ++++++---
 .../seatunnel/file/config/BaseFileSinkConfig.java    |  6 +++---
 .../seatunnel/file/config/BaseSinkConfig.java        |  6 +++---
 .../seatunnel/file/config/BaseSourceConfig.java      |  7 ++++---
 .../file/source/reader/TextReadStrategy.java         |  4 +++-
 .../seatunnel/file/ftp/sink/FtpFileSinkFactory.java  | 18 ++++++++++++------
 .../seatunnel/file/ftp/source/FtpFileSource.java     |  8 +++++---
 .../file/ftp/source/FtpFileSourceFactory.java        |  8 +++++---
 .../file/hdfs/sink/HdfsFileSinkFactory.java          | 18 ++++++++++++------
 .../file/hdfs/source/HdfsFileSourceFactory.java      |  8 +++++---
 .../file/local/sink/LocalFileSinkFactory.java        | 18 ++++++++++++------
 .../seatunnel/file/local/source/LocalFileSource.java |  9 ++++++---
 .../file/local/source/LocalFileSourceFactory.java    |  8 +++++---
 .../seatunnel/file/oss/sink/OssFileSinkFactory.java  | 18 ++++++++++++------
 .../seatunnel/file/oss/source/OssFileSource.java     |  8 +++++---
 .../file/oss/source/OssFileSourceFactory.java        |  8 +++++---
 .../seatunnel/file/oss/sink/OssFileSinkFactory.java  | 18 ++++++++++++------
 .../seatunnel/file/oss/source/OssFileSource.java     |  8 +++++---
 .../file/oss/source/OssFileSourceFactory.java        |  8 +++++---
 .../seatunnel/file/s3/catalog/S3Catalog.java         |  4 ++--
 .../seatunnel/file/s3/sink/S3FileSinkFactory.java    | 18 ++++++++++++------
 .../seatunnel/file/s3/source/S3FileSource.java       |  8 +++++---
 .../file/s3/source/S3FileSourceFactory.java          |  8 +++++---
 .../file/sftp/sink/SftpFileSinkFactory.java          | 18 ++++++++++++------
 .../seatunnel/file/sftp/source/SftpFileSource.java   |  7 ++++---
 .../file/sftp/source/SftpFileSourceFactory.java      |  8 +++++---
 .../connectors/seatunnel/hive/sink/HiveSink.java     | 10 ++++------
 .../connectors/seatunnel/hive/source/HiveSource.java |  6 +++---
 .../catalog/redshift/RedshiftDataTypeConvertor.java  | 20 +++++++++++++++++++-
 .../seatunnel/redshift/sink/S3RedshiftFactory.java   |  2 +-
 .../test/resources/json/fake_to_local_file_json.conf |  2 +-
 .../resources/json/local_file_json_to_assert.conf    |  2 +-
 .../test/resources/orc/fake_to_local_file_orc.conf   |  2 +-
 .../test/resources/orc/local_file_orc_to_assert.conf |  4 ++--
 .../parquet/fake_to_local_file_parquet.conf          |  2 +-
 .../parquet/local_file_parquet_to_assert.conf        |  2 +-
 .../test/resources/text/fake_to_local_file_text.conf |  2 +-
 .../resources/text/local_file_text_skip_headers.conf |  2 +-
 .../resources/text/local_file_text_to_assert.conf    |  2 +-
 .../src/test/resources/batch_fakesource_to_file.conf |  2 +-
 .../resources/batch_fakesource_to_file_complex.conf  |  2 +-
 .../cluster_batch_fake_to_localfile_template.conf    |  2 +-
 ...atch_fake_to_localfile_two_pipeline_template.conf |  2 +-
 .../streaming_fakesource_to_file_complex.conf        |  2 +-
 57 files changed, 277 insertions(+), 187 deletions(-)

diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index f49b34348..b627c4cf0 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -95,7 +95,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -198,7 +198,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
 HdfsFile {
     fs.defaultFS = "hdfs://hadoopcluster"
     path = "/tmp/hive/warehouse/test2"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -228,7 +228,7 @@ HdfsFile {
     custom_filename = true
     file_name_expression = "${transactionId}_${now}"
     filename_time_format = "yyyy.MM.dd"
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
     is_enable_transaction = true
 }
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index 7c4b2b4e0..c9592e35c 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -103,7 +103,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -188,7 +188,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -218,7 +218,7 @@ For parquet file format with `have_partition` and `sink_columns`
     partition_by = ["age"]
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
   }
 
@@ -234,7 +234,7 @@ For orc file format simple config
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/OssJindoFile.md b/docs/en/connector-v2/sink/OssJindoFile.md
index 5447a7098..a43e00562 100644
--- a/docs/en/connector-v2/sink/OssJindoFile.md
+++ b/docs/en/connector-v2/sink/OssJindoFile.md
@@ -23,7 +23,7 @@ It only supports hadoop version **2.9.X+**.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -103,7 +103,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -188,7 +188,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -214,7 +214,7 @@ For parquet file format with `sink_columns`
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "parquet"
+    file_format_type = "parquet"
     sink_columns = ["name","age"]
   }
 
@@ -230,7 +230,7 @@ For orc file format simple config
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    file_format = "orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/S3-Redshift.md b/docs/en/connector-v2/sink/S3-Redshift.md
index 331ca1599..978ffc7c9 100644
--- a/docs/en/connector-v2/sink/S3-Redshift.md
+++ b/docs/en/connector-v2/sink/S3-Redshift.md
@@ -17,7 +17,7 @@ Output data to AWS Redshift.
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -38,7 +38,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | access_secret                    | string  | no       | -                                                         |
 | hadoop_s3_properties             | map     | no       | -                                                         |
 | file_name_expression             | string  | no       | "${transactionId}"                                        |
-| file_format                      | string  | no       | "text"                                                    |
+| file_format_type                 | string  | no       | "text"                                                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
 | field_delimiter                  | string  | no       | '\001'                                                    |
 | row_delimiter                    | string  | no       | "\n"                                                      |
@@ -118,7 +118,7 @@ hadoop_s3_properties {
 
 Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -206,7 +206,7 @@ For text file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type = "text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
@@ -234,7 +234,7 @@ For parquet file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
+    file_format_type = "parquet"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
@@ -262,7 +262,7 @@ For orc file format
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="orc"
+    file_format_type = "orc"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     hadoop_s3_properties {
diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
index 4229564c0..c544ae63b 100644
--- a/docs/en/connector-v2/sink/S3File.md
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -22,7 +22,7 @@ To use this connector you need put hadoop-aws-3.1.4.jar and aws-java-sdk-bundle-
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -42,7 +42,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                                 | Whether you need custom the filename                                                                   |
 | file_name_expression             | string  | no       | "${transactionId}"                                    | Only used when custom_filename is true                                                                 |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                                          | Only used when custom_filename is true                                                                 |
-| file_format                      | string  | no       | "csv"                                                 |                                                                                                        |
+| file_format_type                 | string  | no       | "csv"                                                 |                                                                                                        |
 | field_delimiter                  | string  | no       | '\001'                                                | Only used when file_format is text                                                                     |
 | row_delimiter                    | string  | no       | "\n"                                                  | Only used when file_format is text                                                                     |
 | have_partition                   | boolean | no       | false                                                 | Whether you need processing partitions.                                                                |
@@ -120,7 +120,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -205,7 +205,7 @@ For text file format with `have_partition` and `custom_filename` and `sink_colum
     path="/seatunnel/text"
     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
     fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
-    file_format="text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
@@ -237,7 +237,7 @@ For parquet file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCr
     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
-    file_format="parquet"
+    file_format_type = "parquet"
     hadoop_s3_properties {
       "fs.s3a.buffer.dir" = "/data/st_test/s3a"
       "fs.s3a.fast.upload.buffer" = "disk"
@@ -258,7 +258,7 @@ For orc file format simple config with `org.apache.hadoop.fs.s3a.SimpleAWSCreden
     fs.s3a.aws.credentials.provider="org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider"
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
-    file_format="orc"
+    file_format_type = "orc"
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/SftpFile.md b/docs/en/connector-v2/sink/SftpFile.md
index 0ee306c80..ac17d50e4 100644
--- a/docs/en/connector-v2/sink/SftpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -20,7 +20,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 
 By default, we use 2PC commit to ensure `exactly-once`
 
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -39,7 +39,7 @@ By default, we use 2PC commit to ensure `exactly-once`
 | custom_filename                  | boolean | no       | false                                      | Whether you need custom the filename                      |
 | file_name_expression             | string  | no       | "${transactionId}"                         | Only used when custom_filename is true                    |
 | filename_time_format             | string  | no       | "yyyy.MM.dd"                               | Only used when custom_filename is true                    |
-| file_format                      | string  | no       | "csv"                                      |                                                           |
+| file_format_type                 | string  | no       | "csv"                                      |                                                           |
 | field_delimiter                  | string  | no       | '\001'                                     | Only used when file_format is text                        |
 | row_delimiter                    | string  | no       | "\n"                                       | Only used when file_format is text                        |
 | have_partition                   | boolean | no       | false                                      | Whether you need processing partitions.                   |
@@ -100,7 +100,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 | m      | Minute in hour     |
 | s      | Second in minute   |
 
-### file_format [string]
+### file_format_type [string]
 
 We supported as the following file types:
 
@@ -185,7 +185,7 @@ SftpFile {
     username = "username"
     password = "password"
     path = "/data/sftp"
-    file_format = "text"
+    file_format_type = "text"
     field_delimiter = "\t"
     row_delimiter = "\n"
     have_partition = true
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 124fac7a0..bc5c0519e 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 | user                      | string  | yes      | -                   |
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | read_columns              | list    | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
@@ -139,7 +139,7 @@ The file type supported column projection as the following shown:
 
 **Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured**
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -230,7 +230,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     port = 21
     user = tyrantlucifer
     password = tianchao
-    type = "text"
+    file_format_type = "text"
     schema = {
       name = string
       age = int
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index cde7be449..9bc27bfff 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format file
   - [x] text
   - [x] csv
   - [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 |           name            |  type   | required |    default value    |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | fs.defaultFS              | string  | yes      | -                   |
 | read_columns              | list    | yes      | -                   |
 | hdfs_site_path            | string  | no       | -                   |
@@ -110,7 +110,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -244,7 +244,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 
 HdfsFile {
   path = "/apps/hive/demo/student"
-  type = "parquet"
+  file_format_type = "parquet"
   fs.defaultFS = "hdfs://namenode001"
 }
 
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index 6a74d18ab..69f517075 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -25,7 +25,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -37,7 +37,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 |           name            |  type   | required |    default value    |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | read_columns              | list    | no       | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
@@ -106,7 +106,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -224,7 +224,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 
 LocalFile {
   path = "/apps/hive/demo/student"
-  type = "parquet"
+  file_format_type = "parquet"
 }
 
 ```
@@ -239,7 +239,7 @@ LocalFile {
     }
   }
   path = "/apps/hive/demo/student"
-  type = "json"
+  file_format_type = "json"
 }
 
 ```
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index 0ca6eeb50..7479ee038 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 |           name            |  type   | required |    default value    |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | bucket                    | string  | yes      | -                   |
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
@@ -113,7 +113,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -264,7 +264,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/OssJindoFile.md b/docs/en/connector-v2/source/OssJindoFile.md
index fef93da5b..f7efeb7fe 100644
--- a/docs/en/connector-v2/source/OssJindoFile.md
+++ b/docs/en/connector-v2/source/OssJindoFile.md
@@ -28,7 +28,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [ ] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -40,7 +40,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 |           name            |  type   | required |    default value    |
 |---------------------------|---------|----------|---------------------|
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | bucket                    | string  | yes      | -                   |
 | access_key                | string  | yes      | -                   |
 | access_secret             | string  | yes      | -                   |
@@ -113,7 +113,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -251,7 +251,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -264,7 +264,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
index 65611e770..dee60e979 100644
--- a/docs/en/connector-v2/source/S3File.md
+++ b/docs/en/connector-v2/source/S3File.md
@@ -27,7 +27,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] parquet
@@ -39,7 +39,7 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 |              name               |  type   | required |                     default value                     |
 |---------------------------------|---------|----------|-------------------------------------------------------|
 | path                            | string  | yes      | -                                                     |
-| type                            | string  | yes      | -                                                     |
+| file_format_type                | string  | yes      | -                                                     |
 | bucket                          | string  | yes      | -                                                     |
 | fs.s3a.endpoint                 | string  | yes      | -                                                     |
 | fs.s3a.aws.credentials.provider | string  | yes      | com.amazonaws.auth.InstanceProfileCredentialsProvider |
@@ -124,7 +124,7 @@ For example, set like following:
 
 then Seatunnel will skip the first 2 lines from source files
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -269,7 +269,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     access_key = "xxxxxxxxxxxxxxxxx"
     secret_key = "xxxxxxxxxxxxxxxxx"
     bucket = "s3a://seatunnel-test"
-    type = "orc"
+    file_format_type = "orc"
   }
 
 ```
@@ -281,7 +281,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     bucket = "s3a://seatunnel-test"
     fs.s3a.endpoint="s3.cn-north-1.amazonaws.com.cn"
     fs.s3a.aws.credentials.provider="com.amazonaws.auth.InstanceProfileCredentialsProvider"
-    type = "json"
+    file_format_type = "json"
     schema {
       fields {
         id = int 
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
index 1563874ae..fd27d4fb3 100644
--- a/docs/en/connector-v2/source/SftpFile.md
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -22,7 +22,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 - [x] [column projection](../../concept/connector-v2-features.md)
 - [x] [parallelism](../../concept/connector-v2-features.md)
 - [ ] [support user-defined split](../../concept/connector-v2-features.md)
-- [x] file format
+- [x] file format type
   - [x] text
   - [x] csv
   - [x] json
@@ -36,7 +36,7 @@ If you use SeaTunnel Engine, It automatically integrated the hadoop jar when you
 | user                      | string  | yes      | -                   |
 | password                  | string  | yes      | -                   |
 | path                      | string  | yes      | -                   |
-| type                      | string  | yes      | -                   |
+| file_format_type          | string  | yes      | -                   |
 | delimiter                 | string  | no       | \001                |
 | parse_partition_from_path | boolean | no       | true                |
 | date_format               | string  | no       | yyyy-MM-dd          |
@@ -138,7 +138,7 @@ The file type supported column projection as the following shown:
 
 **Tips: If the user wants to use this feature when reading `text` `json` `csv` files, the schema option must be configured**
 
-### type [string]
+### file_format_type [string]
 
 File type, supported as the following file types:
 
@@ -229,7 +229,7 @@ Source plugin common parameters, please refer to [Source Common Options](common-
     port = 21
     user = tyrantlucifer
     password = tianchao
-    type = "text"
+    file_format_type = "text"
     schema = {
       name = string
       age = int
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 00dff82b0..2a64a6254 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -45,7 +45,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         HdfsSourceConfig.FILE_PATH.key(),
-                        HdfsSourceConfig.FILE_TYPE.key(),
+                        HdfsSourceConfig.FILE_FORMAT_TYPE.key(),
                         HdfsSourceConfig.DEFAULT_FS.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(
@@ -55,7 +55,8 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
         readStrategy =
-                ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
+                ReadStrategyFactory.of(
+                        pluginConfig.getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH.key());
         hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS.key()));
@@ -87,7 +88,9 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
         // support user-defined schema
         FileFormat fileFormat =
                 FileFormat.valueOf(
-                        pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
+                        pluginConfig
+                                .getString(HdfsSourceConfig.FILE_FORMAT_TYPE.key())
+                                .toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index a44b3d458..9a0ac6c67 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -76,11 +76,11 @@ public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
             this.fileNameExpression = config.getString(BaseSinkConfig.FILE_NAME_EXPRESSION.key());
         }
 
-        if (config.hasPath(BaseSinkConfig.FILE_FORMAT.key())
-                && !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT.key()))) {
+        if (config.hasPath(BaseSinkConfig.FILE_FORMAT_TYPE.key())
+                && !StringUtils.isBlank(config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key()))) {
             this.fileFormat =
                     FileFormat.valueOf(
-                            config.getString(BaseSinkConfig.FILE_FORMAT.key())
+                            config.getString(BaseSinkConfig.FILE_FORMAT_TYPE.key())
                                     .toUpperCase(Locale.ROOT));
         }
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
index 43403f632..8e0decf73 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSinkConfig.java
@@ -179,11 +179,11 @@ public class BaseSinkConfig {
                     .withDescription(
                             "Only used when `custom_filename` is true. The time format of the path");
 
-    public static final Option<FileFormat> FILE_FORMAT =
-            Options.key("file_format")
+    public static final Option<FileFormat> FILE_FORMAT_TYPE =
+            Options.key("file_format_type")
                     .enumType(FileFormat.class)
                     .defaultValue(FileFormat.CSV)
-                    .withDescription("File format type");
+                    .withDescription("File format type, e.g. csv, orc, parquet, text");
 
     public static final Option<List<String>> SINK_COLUMNS =
             Options.key("sink_columns")
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 747b972aa..43855399f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -27,11 +27,12 @@ import org.apache.seatunnel.format.text.constant.TextFormatConstant;
 import java.util.List;
 
 public class BaseSourceConfig {
-    public static final Option<FileFormat> FILE_TYPE =
-            Options.key("type")
+    public static final Option<FileFormat> FILE_FORMAT_TYPE =
+            Options.key("file_format_type")
                     .objectType(FileFormat.class)
                     .noDefaultValue()
-                    .withDescription("File type");
+                    .withDescription(
+                            "File format type, e.g. json, csv, text, parquet, orc, avro....");
 
     public static final Option<String> FILE_PATH =
             Options.key("path")
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 144a6e9ef..4b931cb89 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -144,7 +144,9 @@ public class TextReadStrategy extends AbstractReadStrategy {
         } else {
             FileFormat fileFormat =
                     FileFormat.valueOf(
-                            pluginConfig.getString(BaseSourceConfig.FILE_TYPE.key()).toUpperCase());
+                            pluginConfig
+                                    .getString(BaseSourceConfig.FILE_FORMAT_TYPE.key())
+                                    .toUpperCase());
             if (fileFormat == FileFormat.CSV) {
                 fieldDelimiter = ",";
             }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
index fa58c1d89..fad1d7506 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSinkFactory.java
@@ -42,21 +42,27 @@ public class FtpFileSinkFactory implements TableSinkFactory {
                 .required(FtpConfig.FTP_PORT)
                 .required(FtpConfig.FTP_USERNAME)
                 .required(FtpConfig.FTP_PASSWORD)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 017427f77..18e31dadf 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -54,7 +54,7 @@ public class FtpFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         FtpConfig.FILE_PATH.key(),
-                        FtpConfig.FILE_TYPE.key(),
+                        FtpConfig.FILE_FORMAT_TYPE.key(),
                         FtpConfig.FTP_HOST.key(),
                         FtpConfig.FTP_PORT.key(),
                         FtpConfig.FTP_USERNAME.key(),
@@ -67,13 +67,15 @@ public class FtpFileSource extends BaseFileSource {
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
         FileFormat fileFormat =
-                FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
+                FileFormat.valueOf(
+                        pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
             throw new FileConnectorException(
                     CommonErrorCode.ILLEGAL_ARGUMENT,
                     "Ftp file source connector only support read [text, csv, json] files");
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
+        readStrategy =
+                ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(FtpConfig.FILE_PATH.key());
         hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
index 65ff449f4..5e8c34172 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSourceFactory.java
@@ -46,11 +46,13 @@ public class FtpFileSourceFactory implements TableSourceFactory {
                 .required(FtpConfig.FTP_PORT)
                 .required(FtpConfig.FTP_USERNAME)
                 .required(FtpConfig.FTP_PASSWORD)
-                .required(FtpConfig.FILE_TYPE)
+                .required(FtpConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
index 9a2064bd9..22e389403 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/HdfsFileSinkFactory.java
@@ -39,21 +39,27 @@ public class HdfsFileSinkFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(HdfsSourceConfig.DEFAULT_FS)
                 .required(BaseSinkConfig.FILE_PATH)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
index e961bb137..ef2f8a54b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSourceFactory.java
@@ -43,11 +43,13 @@ public class HdfsFileSourceFactory implements TableSourceFactory {
         return OptionRule.builder()
                 .required(HdfsSourceConfig.FILE_PATH)
                 .required(HdfsSourceConfig.DEFAULT_FS)
-                .required(BaseSourceConfig.FILE_TYPE)
+                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
index 41694bb8a..19a8d17ee 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/sink/LocalFileSinkFactory.java
@@ -37,21 +37,27 @@ public class LocalFileSinkFactory implements TableSinkFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(BaseSinkConfig.FILE_PATH)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 673256c2a..01cb7dcb1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -57,7 +57,7 @@ public class LocalFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         LocalSourceConfig.FILE_PATH.key(),
-                        LocalSourceConfig.FILE_TYPE.key());
+                        LocalSourceConfig.FILE_FORMAT_TYPE.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(
                     SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
@@ -66,7 +66,8 @@ public class LocalFileSource extends BaseFileSource {
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
         readStrategy =
-                ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
+                ReadStrategyFactory.of(
+                        pluginConfig.getString(LocalSourceConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH.key());
         hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
@@ -80,7 +81,9 @@ public class LocalFileSource extends BaseFileSource {
         // support user-defined schema
         FileFormat fileFormat =
                 FileFormat.valueOf(
-                        pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
+                        pluginConfig
+                                .getString(LocalSourceConfig.FILE_FORMAT_TYPE.key())
+                                .toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
index ea960cba3..d6f982352 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSourceFactory.java
@@ -42,11 +42,13 @@ public class LocalFileSourceFactory implements TableSourceFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(LocalSourceConfig.FILE_PATH)
-                .required(BaseSourceConfig.FILE_TYPE)
+                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 655988131..5beb85921 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,21 +42,27 @@ public class OssFileSinkFactory implements TableSinkFactory {
                 .required(OssConfig.ACCESS_KEY)
                 .required(OssConfig.ACCESS_SECRET)
                 .required(OssConfig.ENDPOINT)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 5fcc468ce..1f978e5b3 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -55,7 +55,7 @@ public class OssFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         OssConfig.FILE_PATH.key(),
-                        OssConfig.FILE_TYPE.key(),
+                        OssConfig.FILE_FORMAT_TYPE.key(),
                         OssConfig.BUCKET.key(),
                         OssConfig.ACCESS_KEY.key(),
                         OssConfig.ACCESS_SECRET.key(),
@@ -67,7 +67,8 @@ public class OssFileSource extends BaseFileSource {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+        readStrategy =
+                ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -80,7 +81,8 @@ public class OssFileSource extends BaseFileSource {
         }
         // support user-defined schema
         FileFormat fileFormat =
-                FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+                FileFormat.valueOf(
+                        pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 103b8addb..c9b77de9d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss-jindo/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,11 +46,13 @@ public class OssFileSourceFactory implements TableSourceFactory {
                 .required(OssConfig.ACCESS_KEY)
                 .required(OssConfig.ACCESS_SECRET)
                 .required(OssConfig.ENDPOINT)
-                .required(BaseSourceConfig.FILE_TYPE)
+                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
index 655988131..5beb85921 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSinkFactory.java
@@ -42,21 +42,27 @@ public class OssFileSinkFactory implements TableSinkFactory {
                 .required(OssConfig.ACCESS_KEY)
                 .required(OssConfig.ACCESS_SECRET)
                 .required(OssConfig.ENDPOINT)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 1402d03fb..97d283460 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -54,7 +54,7 @@ public class OssFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         OssConfig.FILE_PATH.key(),
-                        OssConfig.FILE_TYPE.key(),
+                        OssConfig.FILE_FORMAT_TYPE.key(),
                         OssConfig.BUCKET.key(),
                         OssConfig.ACCESS_KEY.key(),
                         OssConfig.ACCESS_SECRET.key(),
@@ -66,7 +66,8 @@ public class OssFileSource extends BaseFileSource {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
+        readStrategy =
+                ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(OssConfig.FILE_PATH.key());
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
@@ -79,7 +80,8 @@ public class OssFileSource extends BaseFileSource {
         }
         // support user-defined schema
         FileFormat fileFormat =
-                FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
+                FileFormat.valueOf(
+                        pluginConfig.getString(OssConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
index 103b8addb..c9b77de9d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSourceFactory.java
@@ -46,11 +46,13 @@ public class OssFileSourceFactory implements TableSourceFactory {
                 .required(OssConfig.ACCESS_KEY)
                 .required(OssConfig.ACCESS_SECRET)
                 .required(OssConfig.ENDPOINT)
-                .required(BaseSourceConfig.FILE_TYPE)
+                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
index 9b16b3419..816597f05 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/catalog/S3Catalog.java
@@ -73,10 +73,10 @@ public class S3Catalog implements Catalog {
     @Override
     public void open() throws CatalogException {
         ReadStrategy readStrategy =
-                ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+                ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(s3Config);
         this.defaultDatabase = s3Config.getString(S3Config.FILE_PATH.key());
-        readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_TYPE.key()));
+        readStrategy = ReadStrategyFactory.of(s3Config.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(s3Config);
         try {
             fileSystem =
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
index 685068cc9..86a08931a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSinkFactory.java
@@ -47,21 +47,27 @@ public class S3FileSinkFactory implements TableSinkFactory {
                         S3Config.S3_ACCESS_KEY,
                         S3Config.S3_SECRET_KEY)
                 .optional(S3Config.S3_PROPERTIES)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index 0136b6a7d..fea806c46 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -54,7 +54,7 @@ public class S3FileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         S3Config.FILE_PATH.key(),
-                        S3Config.FILE_TYPE.key(),
+                        S3Config.FILE_FORMAT_TYPE.key(),
                         S3Config.S3_BUCKET.key());
         if (!result.isSuccess()) {
             throw new FileConnectorException(
@@ -63,7 +63,8 @@ public class S3FileSource extends BaseFileSource {
                             "PluginName: %s, PluginType: %s, Message: %s",
                             getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
+        readStrategy =
+                ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(S3Config.FILE_PATH.key());
         hadoopConf = S3Conf.buildWithConfig(pluginConfig);
@@ -76,7 +77,8 @@ public class S3FileSource extends BaseFileSource {
         }
         // support user-defined schema
         FileFormat fileFormat =
-                FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
+                FileFormat.valueOf(
+                        pluginConfig.getString(S3Config.FILE_FORMAT_TYPE.key()).toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(CatalogTableUtil.SCHEMA.key())) {
             switch (fileFormat) {
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
index 165d0d741..51a5e536f 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSourceFactory.java
@@ -42,7 +42,7 @@ public class S3FileSourceFactory implements TableSourceFactory {
     public OptionRule optionRule() {
         return OptionRule.builder()
                 .required(S3Config.FILE_PATH)
-                .required(S3Config.FILE_TYPE)
+                .required(S3Config.FILE_FORMAT_TYPE)
                 .required(S3Config.S3_BUCKET)
                 .required(S3Config.FS_S3A_ENDPOINT)
                 .required(S3Config.S3A_AWS_CREDENTIALS_PROVIDER)
@@ -53,9 +53,11 @@ public class S3FileSourceFactory implements TableSourceFactory {
                         S3Config.S3_SECRET_KEY)
                 .optional(S3Config.S3_PROPERTIES)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
index ac96f89de..8cad73445 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSinkFactory.java
@@ -42,21 +42,27 @@ public class SftpFileSinkFactory implements TableSinkFactory {
                 .required(SftpConfig.SFTP_PORT)
                 .required(SftpConfig.SFTP_USERNAME)
                 .required(SftpConfig.SFTP_PASSWORD)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.TEXT,
                         BaseSinkConfig.ROW_DELIMITER,
                         BaseSinkConfig.FIELD_DELIMITER,
                         BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.CSV, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.CSV,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.JSON, BaseSinkConfig.TXT_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.JSON,
+                        BaseSinkConfig.TXT_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT, FileFormat.ORC, BaseSinkConfig.ORC_COMPRESS)
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
+                        FileFormat.ORC,
+                        BaseSinkConfig.ORC_COMPRESS)
                 .conditional(
-                        BaseSinkConfig.FILE_FORMAT,
+                        BaseSinkConfig.FILE_FORMAT_TYPE,
                         FileFormat.PARQUET,
                         BaseSinkConfig.PARQUET_COMPRESS)
                 .optional(BaseSinkConfig.CUSTOM_FILENAME)
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index 306835cb0..28a06d17a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -54,7 +54,7 @@ public class SftpFileSource extends BaseFileSource {
                 CheckConfigUtil.checkAllExists(
                         pluginConfig,
                         SftpConfig.FILE_PATH.key(),
-                        SftpConfig.FILE_TYPE.key(),
+                        SftpConfig.FILE_FORMAT_TYPE.key(),
                         SftpConfig.SFTP_HOST.key(),
                         SftpConfig.SFTP_PORT.key(),
                         SftpConfig.SFTP_USERNAME.key(),
@@ -68,13 +68,14 @@ public class SftpFileSource extends BaseFileSource {
         }
         FileFormat fileFormat =
                 FileFormat.valueOf(
-                        pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
+                        pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
             throw new FileConnectorException(
                     CommonErrorCode.ILLEGAL_ARGUMENT,
                     "Sftp file source connector only support read [text, csv, json] files");
         }
-        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
+        readStrategy =
+                ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_FORMAT_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(SftpConfig.FILE_PATH.key());
         hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
index dcec3f343..32ecfa392 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSourceFactory.java
@@ -46,11 +46,13 @@ public class SftpFileSourceFactory implements TableSourceFactory {
                 .required(SftpConfig.SFTP_PORT)
                 .required(SftpConfig.SFTP_USERNAME)
                 .required(SftpConfig.SFTP_PASSWORD)
-                .required(BaseSourceConfig.FILE_TYPE)
+                .required(BaseSourceConfig.FILE_FORMAT_TYPE)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE, FileFormat.TEXT, BaseSourceConfig.DELIMITER)
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
+                        FileFormat.TEXT,
+                        BaseSourceConfig.DELIMITER)
                 .conditional(
-                        BaseSourceConfig.FILE_TYPE,
+                        BaseSourceConfig.FILE_FORMAT_TYPE,
                         Arrays.asList(FileFormat.TEXT, FileFormat.JSON),
                         CatalogTableUtil.SCHEMA)
                 .optional(BaseSourceConfig.PARSE_PARTITION_FROM_PATH)
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index b773bc4bd..19068fcf3 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -17,9 +17,6 @@
 
 package org.apache.seatunnel.connectors.seatunnel.hive.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigValueFactory;
-
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -55,6 +52,7 @@ import java.util.stream.Collectors;
 import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FIELD_DELIMITER;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT;
+import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_FORMAT_TYPE;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_NAME_EXPRESSION;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.FILE_PATH;
 import static org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig.HAVE_PARTITION;
@@ -140,7 +138,7 @@ public class HiveSink extends BaseHdfsFileSink {
             pluginConfig =
                     pluginConfig
                             .withValue(
-                                    FILE_FORMAT.key(),
+                                    FILE_FORMAT_TYPE.key(),
                                     ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()))
                             .withValue(
                                     FIELD_DELIMITER.key(),
@@ -151,12 +149,12 @@ public class HiveSink extends BaseHdfsFileSink {
         } else if (PARQUET_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            FILE_FORMAT.key(),
+                            FILE_FORMAT_TYPE.key(),
                             ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_OUTPUT_FORMAT_CLASSNAME.equals(outputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            FILE_FORMAT.key(),
+                            FILE_FORMAT_TYPE.key(),
                             ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
         } else {
             throw new HiveConnectorException(
diff --git a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
index 5f2e27424..8ec5d2d3f 100644
--- a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
+++ b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/HiveSource.java
@@ -126,7 +126,7 @@ public class HiveSource extends BaseHdfsFileSource {
         if (TEXT_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            FILE_TYPE.key(),
+                            FILE_FORMAT_TYPE.key(),
                             ConfigValueFactory.fromAnyRef(FileFormat.TEXT.toString()));
             // Build schema from hive table information
             // Because the entrySet in typesafe config couldn't keep key-value order
@@ -140,12 +140,12 @@ public class HiveSource extends BaseHdfsFileSource {
         } else if (PARQUET_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            FILE_TYPE.key(),
+                            FILE_FORMAT_TYPE.key(),
                             ConfigValueFactory.fromAnyRef(FileFormat.PARQUET.toString()));
         } else if (ORC_INPUT_FORMAT_CLASSNAME.equals(inputFormat)) {
             pluginConfig =
                     pluginConfig.withValue(
-                            FILE_TYPE.key(),
+                            FILE_FORMAT_TYPE.key(),
                             ConfigValueFactory.fromAnyRef(FileFormat.ORC.toString()));
         } else {
             throw new HiveConnectorException(
diff --git a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
index 44b875f6b..2c11040fd 100644
--- a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
+++ b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/catalog/redshift/RedshiftDataTypeConvertor.java
@@ -1,3 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
 package org.apache.seatunnel.connectors.seatunnel.jdbc.catalog.redshift;
 
 import org.apache.seatunnel.api.table.catalog.DataTypeConvertException;
@@ -71,7 +89,7 @@ public class RedshiftDataTypeConvertor implements DataTypeConvertor<String> {
     private static final String REDSHIFT_TIMETZ = "timetz";
     private static final String REDSHIFT_TIMESTAMP = "timestamp";
     private static final String REDSHIFT_TIMESTAMP_WITH_OUT_TIME_ZONE =
-        "timestamp without time zone";
+            "timestamp without time zone";
 
     private static final String REDSHIFT_TIMESTAMPTZ = "timestamptz";
 
diff --git a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
index 2a3a7d66c..0211cadbe 100644
--- a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++ b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
@@ -46,7 +46,7 @@ public class S3RedshiftFactory implements TableSinkFactory {
                         S3RedshiftConfig.EXECUTE_SQL,
                         BaseSourceConfig.FILE_PATH)
                 .optional(S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY)
-                .optional(BaseSinkConfig.FILE_FORMAT)
+                .optional(BaseSinkConfig.FILE_FORMAT_TYPE)
                 .optional(BaseSinkConfig.FILENAME_TIME_FORMAT)
                 .optional(BaseSinkConfig.FIELD_DELIMITER)
                 .optional(BaseSinkConfig.ROW_DELIMITER)
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
index 9d48af1cf..9f58e0f22 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
@@ -72,7 +72,7 @@ sink {
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
-    file_format = "json"
+    file_format_type = "json"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
   }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
index ba94dce76..19f7b95a8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
@@ -28,7 +28,7 @@ env {
 source {
   LocalFile {
     path = "/seatunnel/read/json"
-    type = "json"
+    file_format_type = "json"
     schema = {
       fields {
         c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
index a134e9fc0..3490fb169 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
@@ -72,7 +72,7 @@ sink {
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
-    file_format = "orc"
+    file_format_type = "orc"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
     compress_codec = "zlib"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
index 7549af6fd..77a9c3a58 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
@@ -27,8 +27,8 @@ env {
 
 source {
   LocalFile {
-    path = "/seatunnel/read/orc"
-    type = "orc"
+    path = "/seatunnel/read/parquet"
+    file_format_type = "parquet"
     result_table_name = "fake"
   }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
index c3eae33b0..a915229f5 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
@@ -72,7 +72,7 @@ sink {
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
-    file_format = "parquet"
+    file_format_type = "parquet"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
     compress_codec = "gzip"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
index d99fef31a..07776f0b8 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
@@ -28,7 +28,7 @@ env {
 source {
   LocalFile {
     path = "/seatunnel/read/parquet"
-    type = "parquet"
+    file_format_type = "parquet"
     result_table_name = "fake"
   }
 }
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
index 795d82234..294c4538e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
@@ -72,7 +72,7 @@ sink {
     partition_dir_expression = "${k0}=${v0}"
     is_partition_field_write_in_file = true
     file_name_expression = "${transactionId}_${now}"
-    file_format = "text"
+    file_format_type = "text"
     filename_time_format = "yyyy.MM.dd"
     is_enable_transaction = true
     compress_codec = "lzo"
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
index c2b6e0402..7ebeba63a 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_skip_headers.conf
@@ -28,7 +28,7 @@ env {
 source {
   LocalFile {
     path = "/seatunnel/read/text"
-    type = "text"
+    file_format_type = "text"
     skip_header_row_number = 1
     schema = {
       fields {
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
index 11c0a5710..05ce1ee6e 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
@@ -28,7 +28,7 @@ env {
 source {
   LocalFile {
     path = "/seatunnel/read/text"
-    type = "text"
+    file_format_type = "text"
     schema = {
       fields {
         c_map = "map<string, string>"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
index 10c2b673e..78ea7db94 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file.conf
@@ -86,7 +86,7 @@ sink {
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type="text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
index 2a327e463..5bbd38ada 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/batch_fakesource_to_file_complex.conf
@@ -132,7 +132,7 @@ sink {
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type="text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
index 1c3bd3ebe..f469c961f 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_template.conf
@@ -84,7 +84,7 @@ sink {
     field_delimiter="\t"
     row_delimiter="\n"
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type="text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
index ef1ce902a..4d7f65d53 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/cluster_batch_fake_to_localfile_two_pipeline_template.conf
@@ -146,7 +146,7 @@ sink {
     field_delimiter="\t"
     row_delimiter="\n"
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type="text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error"
diff --git a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
index 19ecb9359..96daf1365 100644
--- a/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
+++ b/seatunnel-e2e/seatunnel-engine-e2e/connector-seatunnel-e2e-base/src/test/resources/streaming_fakesource_to_file_complex.conf
@@ -132,7 +132,7 @@ sink {
     partition_dir_expression="${k0}=${v0}"
     is_partition_field_write_in_file=true
     file_name_expression="${transactionId}_${now}"
-    file_format="text"
+    file_format_type="text"
     filename_time_format="yyyy.MM.dd"
     is_enable_transaction=true
     save_mode="error",