You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ty...@apache.org on 2022/10/21 07:09:32 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][SFTP] Add SFTP file source & sink connector (#3006)

This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 9e496383b [Feature][Connector-V2][SFTP] Add SFTP file source & sink connector (#3006)
9e496383b is described below

commit 9e496383b840754251ff2eca8265f7a63a0f28a0
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Oct 21 15:09:26 2022 +0800

    [Feature][Connector-V2][SFTP] Add SFTP file source & sink connector (#3006)
    
    * [Feature][Connector-V2][SFTP] Add sftp file source connector
    
    * [Feature][Connector-V2][SFTP] Fix the stuck bug
    
    * [Feature][Connector-V2][SFTP] Fix code style
    
    * [Feature][Connector-V2][SFTP] Fix license header
    
    * [Feature][Connector-V2][SFTP] Add sftp sink
    
    * [Feature][Connector-V2][SFTP] Add docs for sftp connectors
    
    * [Feature][Connector-V2][SFTP] Update plugin-mapping.properties
    
    * [Feature][Connector-V2][SFTP] Update sftp source
    
    * [Feature][Connector-V2][SFTP] Update sftp source docs
    
    * [Feature][Connector-V2][File] Update docs
    
    * [Feature][Connector-V2][SFTP] Update seatunnel-dist pom file
    
    * [Feature][Connector-V2][SFTP] Add change log
    
    * [Improve][Connector-V2][SFTP] Fix code compile error
---
 docs/en/connector-v2/sink/FtpFile.md               |  52 +-
 docs/en/connector-v2/sink/HdfsFile.md              | 104 ++--
 docs/en/connector-v2/sink/LocalFile.md             |  95 ++--
 docs/en/connector-v2/sink/OssFile.md               | 110 ++--
 .../connector-v2/sink/{FtpFile.md => SftpFile.md}  |  81 ++-
 docs/en/connector-v2/source/SftpFile.md            | 214 +++++++
 plugin-mapping.properties                          |   2 +
 .../seatunnel/file/config/FileSystemType.java      |   1 +
 .../connector-file/connector-file-sftp/pom.xml     |  45 ++
 .../seatunnel/file/sftp/config/SftpConf.java       |  44 ++
 .../seatunnel/file/sftp/config/SftpConfig.java}    |  25 +-
 .../seatunnel/file/sftp/sink/SftpFileSink.java     |  52 ++
 .../seatunnel/file/sftp/source/SftpFileSource.java |  98 ++++
 .../file/sftp/system/SFTPConnectionPool.java       | 302 ++++++++++
 .../seatunnel/file/sftp/system/SFTPFileSystem.java | 619 +++++++++++++++++++++
 .../file/sftp/system/SFTPInputStream.java          | 131 +++++
 .../services/org.apache.hadoop.fs.FileSystem       |  16 +
 seatunnel-connectors-v2/connector-file/pom.xml     |   1 +
 seatunnel-dist/pom.xml                             |   6 +
 19 files changed, 1742 insertions(+), 256 deletions(-)

diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md
index c360be27b..5e3d9888b 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -9,7 +9,16 @@ Output data to Ftp .
 ## Key features
 
 - [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
 - [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+    - [x] text
+    - [x] csv
+    - [x] parquet
+    - [x] orc
+    - [x] json
 
 ##  Options
 
@@ -26,12 +35,11 @@ Output data to Ftp .
 | field_delimiter                  | string  | no       | '\001'                                                    |
 | row_delimiter                    | string  | no       | "\n"                                                      |
 | partition_by                     | array   | no       | -                                                         |
-| partition_dir_expression         | string  | no       | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/"      |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
 | is_partition_field_write_in_file | boolean | no       | false                                                     |
 | sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                                      |
-| save_mode                        | string  | no       | "error"                                                   |
-| common-options                   |         | no       | -             |
+| common-options                   |         | no       | -                                                         |
 
 ### host [string]
 
@@ -119,14 +127,6 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
-### save_mode [string]
-
-Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
-
-If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
-
-For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
-
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -138,21 +138,21 @@ For text file format
 ```bash
 
 FtpFile {
-    host="xxx.xxx.xxx.xxx"
-    port=21
-    username="username"
-    password="password"
-    path="/data/ftp"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    host = "xxx.xxx.xxx.xxx"
+    port = 21
+    username = "username"
+    password = "password"
+    path = "/data/ftp"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
diff --git a/docs/en/connector-v2/sink/HdfsFile.md b/docs/en/connector-v2/sink/HdfsFile.md
index 7ce94709d..9b49a4992 100644
--- a/docs/en/connector-v2/sink/HdfsFile.md
+++ b/docs/en/connector-v2/sink/HdfsFile.md
@@ -24,22 +24,22 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 In order to use this connector, You must ensure your spark/flink cluster already integrated hadoop. The tested hadoop version is 2.x.
 
-| name                             | type   | required | default value                                           |
-|----------------------------------| ------ | -------- |--------------------------------------------------------|
-| fs.defaultFS                     | string | yes      | -                                                       |
-| path                             | string | yes      | -                                                       |
-| file_name_expression             | string | no       | "${transactionId}"                      |
-| file_format                      | string | no       | "text"                                                 |
-| filename_time_format             | string | no       | "yyyy.MM.dd"                                           |
-| field_delimiter                  | string | no       | '\001'                                                 |
-| row_delimiter                    | string | no       | "\n"                                                   |
-| partition_by                     | array  | no       | -                                                       |
-| partition_dir_expression         | string | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"             |
-| is_partition_field_write_in_file | boolean| no       | false                                                   |
-| sink_columns                     | array  | no       | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction            | boolean| no       | true                                                   |
-| save_mode                        | string | no       | "error"                                                 |
-| common-options                   |        | no       | -                                                       |
+| name                             | type    | required | default value                                             |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| fs.defaultFS                     | string  | yes      | -                                                         |
+| path                             | string  | yes      | -                                                         |
+| file_name_expression             | string  | no       | "${transactionId}"                                        |
+| file_format                      | string  | no       | "text"                                                    |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
+| field_delimiter                  | string  | no       | '\001'                                                    |
+| row_delimiter                    | string  | no       | "\n"                                                      |
+| partition_by                     | array   | no       | -                                                         |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
+| is_partition_field_write_in_file | boolean | no       | false                                                     |
+| sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean | no       | true                                                      |
+| common-options                   |         | no       | -                                                         |
+
 ### fs.defaultFS [string]
 
 The hadoop cluster address that start with `hdfs://`, for example: `hdfs://hadoopcluster`
@@ -115,14 +115,6 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
-### save_mode [string]
-
-Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
-
-If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
-
-For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
-
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details
@@ -134,18 +126,18 @@ For text file format
 ```bash
 
 HdfsFile {
-    fs.defaultFS="hdfs://hadoopcluster"
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    fs.defaultFS = "hdfs://hadoopcluster"
+    path = "/tmp/hive/warehouse/test2"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
@@ -155,16 +147,16 @@ For parquet file format
 ```bash
 
 HdfsFile {
-    fs.defaultFS="hdfs://hadoopcluster"
-    path="/tmp/hive/warehouse/test2"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    fs.defaultFS = "hdfs://hadoopcluster"
+    path = "/tmp/hive/warehouse/test2"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "parquet"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
@@ -174,16 +166,16 @@ For orc file format
 ```bash
 
 HdfsFile {
-    fs.defaultFS="hdfs://hadoopcluster"
-    path="/tmp/hive/warehouse/test2"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="orc"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    fs.defaultFS = "hdfs://hadoopcluster"
+    path = "/tmp/hive/warehouse/test2"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "orc"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
diff --git a/docs/en/connector-v2/sink/LocalFile.md b/docs/en/connector-v2/sink/LocalFile.md
index 15fd18c60..6246e4059 100644
--- a/docs/en/connector-v2/sink/LocalFile.md
+++ b/docs/en/connector-v2/sink/LocalFile.md
@@ -22,21 +22,20 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-| name                              | type   | required | default value                                       |
-| --------------------------------- | ------ | -------- | --------------------------------------------------- |
-| path                              | string | yes      | -                                                   |
-| file_name_expression              | string | no       | "${transactionId}"                                  |
-| file_format                       | string | no       | "text"                                              |
-| filename_time_format              | string | no       | "yyyy.MM.dd"                                        |
-| field_delimiter                   | string | no       | '\001'                                              |
-| row_delimiter                     | string | no       | "\n"                                                |
-| partition_by                      | array  | no       | -                                                   |
-| partition_dir_expression          | string | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"          |
-| is_partition_field_write_in_file  | boolean| no       | false                                               |
-| sink_columns                      | array  | no       | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction             | boolean| no       | true                                                |
-| save_mode                         | string | no       | "error"                                             |
-| common-options                    |        | no       | -                                                  |
+| name                             | type    | required | default value                                             |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| path                             | string  | yes      | -                                                         |
+| file_name_expression             | string  | no       | "${transactionId}"                                        |
+| file_format                      | string  | no       | "text"                                                    |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
+| field_delimiter                  | string  | no       | '\001'                                                    |
+| row_delimiter                    | string  | no       | "\n"                                                      |
+| partition_by                     | array   | no       | -                                                         |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
+| is_partition_field_write_in_file | boolean | no       | false                                                     |
+| sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean | no       | true                                                      |
+| common-options                   |         | no       | -                                                         |
 
 ### path [string]
 
@@ -109,14 +108,6 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
-### save_mode [string]
-
-Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
-
-If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
-
-For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes).
-
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -128,17 +119,17 @@ For text file format
 ```bash
 
 LocalFile {
-    path="/tmp/hive/warehouse/test2"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    path = "/tmp/hive/warehouse/test2"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
@@ -148,15 +139,15 @@ For parquet file format
 ```bash
 
 LocalFile {
-    path="/tmp/hive/warehouse/test2"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    path = "/tmp/hive/warehouse/test2"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "parquet"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
@@ -166,15 +157,15 @@ For orc file format
 ```bash
 
 LocalFile {
-    path="/tmp/hive/warehouse/test2"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="orc"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+    path = "/tmp/hive/warehouse/test2"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "orc"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
diff --git a/docs/en/connector-v2/sink/OssFile.md b/docs/en/connector-v2/sink/OssFile.md
index cbaeb5daf..a7d9c8bb8 100644
--- a/docs/en/connector-v2/sink/OssFile.md
+++ b/docs/en/connector-v2/sink/OssFile.md
@@ -25,25 +25,24 @@ By default, we use 2PC commit to ensure `exactly-once`
 
 ## Options
 
-| name                             | type   | required | default value               |
-|----------------------------------| ------ |---------|-----------------------------|
-| path                             | string | yes     | -                           |
-| bucket                           | string | yes     | -                           |
-| access_key                       | string | yes     | -                           |
-| access_secret                    | string | yes     | -                           |
-| endpoint                         | string | yes     | -                           |
-| file_name_expression             | string | no      | "${transactionId}"          |
-| file_format                      | string | no      | "text"                      |
-| filename_time_format             | string | no      | "yyyy.MM.dd"                |
-| field_delimiter                  | string | no      | '\001'                      |
-| row_delimiter                    | string | no      | "\n"                        |
-| partition_by                     | array  | no      | -                          |
-| partition_dir_expression         | string | no      | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/" |
-| is_partition_field_write_in_file | boolean| no      | false                      |
-| sink_columns                     | array  | no      | When this parameter is empty, all fields are sink columns |
-| is_enable_transaction            | boolean| no      | true                        |
-| save_mode                        | string | no      | "error"                    |
-| common-options                   |        | no       | -                          |
+| name                             | type    | required | default value                                             |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| path                             | string  | yes      | -                                                         |
+| bucket                           | string  | yes      | -                                                         |
+| access_key                       | string  | yes      | -                                                         |
+| access_secret                    | string  | yes      | -                                                         |
+| endpoint                         | string  | yes      | -                                                         |
+| file_name_expression             | string  | no       | "${transactionId}"                                        |
+| file_format                      | string  | no       | "text"                                                    |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
+| field_delimiter                  | string  | no       | '\001'                                                    |
+| row_delimiter                    | string  | no       | "\n"                                                      |
+| partition_by                     | array   | no       | -                                                         |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
+| is_partition_field_write_in_file | boolean | no       | false                                                     |
+| sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean | no       | true                                                      |
+| common-options                   |         | no       | -                                                         |
 
 ### path [string]
 
@@ -132,14 +131,6 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
-### save_mode [string]
-
-Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
-
-If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
-
-For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes).
-
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -156,17 +147,16 @@ For text file format
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
   }
 
 ```
@@ -176,22 +166,21 @@ For parquet file format
 ```hocon
 
   OssFile {
-    path="/seatunnel/sink"
+    path = "/seatunnel/sink"
     bucket = "oss://tyrantlucifer-image-bed"
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="parquet"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "parquet"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
   }
 
 ```
@@ -206,17 +195,16 @@ For orc file format
     access_key = "xxxxxxxxxxx"
     access_secret = "xxxxxxxxxxx"
     endpoint = "oss-cn-beijing.aliyuncs.com"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="orc"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
-    save_mode="error"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "orc"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
   }
 
 ```
diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/SftpFile.md
similarity index 73%
copy from docs/en/connector-v2/sink/FtpFile.md
copy to docs/en/connector-v2/sink/SftpFile.md
index c360be27b..328cb1950 100644
--- a/docs/en/connector-v2/sink/FtpFile.md
+++ b/docs/en/connector-v2/sink/SftpFile.md
@@ -1,15 +1,24 @@
-# FtpFile
+# SftpFile
 
-> Ftp file sink connector
+> Sftp file sink connector
 
 ## Description
 
-Output data to Ftp . 
+Output data to Sftp .
 
 ## Key features
 
 - [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
 - [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+    - [x] text
+    - [x] csv
+    - [x] parquet
+    - [x] orc
+    - [x] json
 
 ##  Options
 
@@ -26,28 +35,27 @@ Output data to Ftp .
 | field_delimiter                  | string  | no       | '\001'                                                    |
 | row_delimiter                    | string  | no       | "\n"                                                      |
 | partition_by                     | array   | no       | -                                                         |
-| partition_dir_expression         | string  | no       | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/"      |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
 | is_partition_field_write_in_file | boolean | no       | false                                                     |
 | sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
 | is_enable_transaction            | boolean | no       | true                                                      |
-| save_mode                        | string  | no       | "error"                                                   |
-| common-options                   |         | no       | -             |
+| common-options                   |         | no       | -                                                         |
 
 ### host [string]
 
-The target ftp host is required
+The target sftp host is required
 
 ### port [int]
 
-The target ftp port is required
+The target sftp port is required
 
 ### username [string]
 
-The target ftp username is required
+The target sftp username is required
 
 ### password [string]
 
-The target ftp password is required
+The target sftp password is required
 
 ### path [string]
 
@@ -84,7 +92,7 @@ When the format in the `file_name_expression` parameter is `xxxx-${now}` , `file
 
 ### field_delimiter [string]
 
-The separator between columns in a row of data. Only needed by `text` and `csv` file format.
+The separator between columns in a row of data. Only needed by `text` file format.
 
 ### row_delimiter [string]
 
@@ -119,14 +127,6 @@ Please note that, If `is_enable_transaction` is `true`, we will auto add `${tran
 
 Only support `true` now.
 
-### save_mode [string]
-
-Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
-
-If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
-
-For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
-
 ### common options
 
 Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
@@ -137,33 +137,28 @@ For text file format
 
 ```bash
 
-FtpFile {
-    host="xxx.xxx.xxx.xxx"
-    port=21
-    username="username"
-    password="password"
-    path="/data/ftp"
-    field_delimiter="\t"
-    row_delimiter="\n"
-    partition_by=["age"]
-    partition_dir_expression="${k0}=${v0}"
-    is_partition_field_write_in_file=true
-    file_name_expression="${transactionId}_${now}"
-    file_format="text"
-    sink_columns=["name","age"]
-    filename_time_format="yyyy.MM.dd"
-    is_enable_transaction=true
+SftpFile {
+    host = "xxx.xxx.xxx.xxx"
+    port  =22
+    username = "username"
+    password = "password"
+    path = "/data/sftp"
+    field_delimiter = "\t"
+    row_delimiter = "\n"
+    partition_by = ["age"]
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    sink_columns = ["name","age"]
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
 }
 
 ```
 
 ## Changelog
 
-### 2.2.0-beta 2022-09-26
-
-- Add Ftp File Sink Connector
-
-### 2.3.0-beta 2022-10-20
-- [BugFix] Fix the bug of incorrect path in windows environment ([2980](https://github.com/apache/incubator-seatunnel/pull/2980))
-- [BugFix] Fix filesystem get error ([3117](https://github.com/apache/incubator-seatunnel/pull/3117))
-- [BugFix] Solved the bug of can not parse '\t' as delimiter from config file ([3083](https://github.com/apache/incubator-seatunnel/pull/3083))
+| Version    | Date       | Pull Request                                                    | Subject      |
+|------------|------------|-----------------------------------------------------------------|--------------|
+| 2.2.0-beta | 2022-10-06 | [3006](https://github.com/apache/incubator-seatunnel/pull/3006) | First commit |
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/SftpFile.md b/docs/en/connector-v2/source/SftpFile.md
new file mode 100644
index 000000000..bab807787
--- /dev/null
+++ b/docs/en/connector-v2/source/SftpFile.md
@@ -0,0 +1,214 @@
+# SftpFile
+
+> Sftp file source connector
+
+## Description
+
+Read data from sftp file server.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] file format
+    - [x] text
+    - [x] csv
+    - [x] json
+
+## Options
+
+| name                        | type    | required | default value       |
+|-----------------------------|---------|----------|---------------------|
+| host                        | string  | yes      | -                   |
+| port                        | int     | yes      | -                   |
+| user                        | string  | yes      | -                   |
+| password                    | string  | yes      | -                   |
+| path                        | string  | yes      | -                   |
+| type                        | string  | yes      | -                   |
+| delimiter                   | string  | no       | \001                |
+| parse_partition_from_path   | boolean | no       | true                |
+| date_format                 | string  | no       | yyyy-MM-dd          |
+| datetime_format             | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format                 | string  | no       | HH:mm:ss            |
+| schema                      | config  | no       | -                   |
+| common-options              |         | no       | -                   |
+
+### host [string]
+
+The target sftp host is required
+
+### port [int]
+
+The target sftp port is required
+
+### username [string]
+
+The target sftp username is required
+
+### password [string]
+
+The target sftp password is required
+
+### path [string]
+
+The source file path.
+
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `sftp://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
+### schema [config]
+
+The schema information of upstream data.
+
+### type [string]
+
+File type, supported as the following file types:
+
+`text` `csv` `json`
+
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
+
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
+
+### common options
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Example
+
+```hocon
+
+  SftpFile {
+    path = "/tmp/seatunnel/sink/text"
+    host = "192.168.31.48"
+    port = 21
+    user = tyrantlucifer
+    password = tianchao
+    type = "text"
+    schema = {
+      name = string
+      age = int
+    }
+    delimiter = "#"
+  }
+
+```
+
+## Changelog
+
+| Version    | Date       | Pull Request                                                    | Subject      |
+|------------|------------|-----------------------------------------------------------------|--------------|
+| 2.2.0-beta | 2022-10-06 | [3006](https://github.com/apache/incubator-seatunnel/pull/3006) | First commit |
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index c2ce92848..e022fa59b 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -123,6 +123,8 @@ seatunnel.source.Neo4j = connector-neo4j
 seatunnel.sink.Neo4j = connector-neo4j
 seatunnel.source.FtpFile = connector-file-ftp
 seatunnel.sink.FtpFile = connector-file-ftp
+seatunnel.source.SftpFile = connector-file-sftp
+seatunnel.sink.SftpFile = connector-file-sftp
 seatunnel.sink.Socket = connector-socket
 seatunnel.source.Redis = connector-redis
 seatunnel.sink.Redis = connector-redis
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 9dbc34a5d..2aca9ded0 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -24,6 +24,7 @@ public enum FileSystemType implements Serializable {
     LOCAL("LocalFile"),
     OSS("OssFile"),
     FTP("FtpFile"),
+    SFTP("SftpFile"),
     S3("S3File");
 
     private final String fileSystemPluginName;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-sftp/pom.xml
new file mode 100644
index 000000000..6740f0eab
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/pom.xml
@@ -0,0 +1,45 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>connector-file</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-file-sftp</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
new file mode 100644
index 000000000..b210af878
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConf.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.HashMap;
+
+public class SftpConf extends HadoopConf {
+
+    private SftpConf(String hdfsNameKey) {
+        super(hdfsNameKey);
+    }
+
+    public static HadoopConf buildWithConfig(Config config) {
+        String host = config.getString(SftpConfig.SFTP_HOST);
+        int port = config.getInt(SftpConfig.SFTP_PORT);
+        String defaultFS = String.format("sftp://%s:%s", host, port);
+        HadoopConf hadoopConf = new SftpConf(defaultFS);
+        HashMap<String, String> sftpOptions = new HashMap<>();
+        sftpOptions.put("fs.sftp.user." + host, config.getString(SftpConfig.SFTP_USERNAME));
+        sftpOptions.put("fs.sftp.password." + host + "." + config.getString(SftpConfig.SFTP_USERNAME), config.getString(SftpConfig.SFTP_PASSWORD));
+        sftpOptions.put("fs.sftp.impl", "org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem");
+        hadoopConf.setExtraOptions(sftpOptions);
+        return hadoopConf;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConfig.java
similarity index 61%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConfig.java
index 9dbc34a5d..85981f72b 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/config/SftpConfig.java
@@ -15,24 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.config;
 
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 
-public enum FileSystemType implements Serializable {
-    HDFS("HdfsFile"),
-    LOCAL("LocalFile"),
-    OSS("OssFile"),
-    FTP("FtpFile"),
-    S3("S3File");
-
-    private final String fileSystemPluginName;
-
-    FileSystemType(String fileSystemPluginName) {
-        this.fileSystemPluginName = fileSystemPluginName;
-    }
-
-    public String getFileSystemPluginName() {
-        return fileSystemPluginName;
-    }
+public class SftpConfig extends BaseSourceConfig {
+    public static final String SFTP_PASSWORD = "password";
+    public static final String SFTP_USERNAME = "user";
+    public static final String SFTP_HOST = "host";
+    public static final String SFTP_PORT = "port";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
new file mode 100644
index 000000000..59f98fa23
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class SftpFileSink extends BaseFileSink {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.SFTP.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                SftpConfig.SFTP_HOST, SftpConfig.SFTP_PORT,
+                SftpConfig.SFTP_USERNAME, SftpConfig.SFTP_PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        super.prepare(pluginConfig);
+        hadoopConf = SftpConf.buildWithConfig(pluginConfig);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
new file mode 100644
index 000000000..7bf20ebeb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSource.class)
+public class SftpFileSource extends BaseFileSource {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.SFTP.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, SftpConfig.FILE_PATH, SftpConfig.FILE_TYPE,
+                SftpConfig.SFTP_HOST, SftpConfig.SFTP_PORT,
+                SftpConfig.SFTP_USERNAME, SftpConfig.SFTP_PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_TYPE).toUpperCase());
+        if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Sftp file source connector only support read [text, csv, json] files");
+        }
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
+        String path = pluginConfig.getString(SftpConfig.FILE_PATH);
+        hadoopConf = SftpConf.buildWithConfig(pluginConfig);
+        try {
+            filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
+        } catch (IOException e) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
+        }
+        // support user-defined schema
+        // only json csv text type support user-defined schema now
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
+        } else {
+            try {
+                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+            } catch (FilePluginException e) {
+                throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPConnectionPool.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPConnectionPool.java
new file mode 100644
index 000000000..5363ee74f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPConnectionPool.java
@@ -0,0 +1,302 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.system;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSch;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.hadoop.util.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.Set;
+
+public class SFTPConnectionPool {
+
+    public static final Logger LOG =
+            LoggerFactory.getLogger(SFTPFileSystem.class);
+    // Maximum number of allowed live connections. This doesn't mean we cannot
+    // have more live connections. It means that when we have more
+    // live connections than this threshold, any unused connection will be
+    // closed.
+    private int maxConnection;
+    private int liveConnectionCount;
+    private HashMap<ConnectionInfo, HashSet<ChannelSftp>> idleConnections =
+            new HashMap<ConnectionInfo, HashSet<ChannelSftp>>();
+    private HashMap<ChannelSftp, ConnectionInfo> con2infoMap =
+            new HashMap<ChannelSftp, ConnectionInfo>();
+
+    SFTPConnectionPool(int maxConnection, int liveConnectionCount) {
+        this.maxConnection = maxConnection;
+        this.liveConnectionCount = liveConnectionCount;
+    }
+
+    synchronized ChannelSftp getFromPool(ConnectionInfo info) throws IOException {
+        Set<ChannelSftp> cons = idleConnections.get(info);
+        ChannelSftp channel;
+
+        if (cons != null && cons.size() > 0) {
+            Iterator<ChannelSftp> it = cons.iterator();
+            if (it.hasNext()) {
+                channel = it.next();
+                idleConnections.remove(info);
+                return channel;
+            } else {
+                throw new IOException("Connection pool error.");
+            }
+        }
+        return null;
+    }
+
+    synchronized void returnToPool(ChannelSftp channel) {
+        ConnectionInfo info = con2infoMap.get(channel);
+        HashSet<ChannelSftp> cons = idleConnections.get(info);
+        if (cons == null) {
+            cons = new HashSet<ChannelSftp>();
+            idleConnections.put(info, cons);
+        }
+        cons.add(channel);
+
+    }
+
+    /**
+     * Shutdown the connection pool and close all open connections.
+     */
+    synchronized void shutdown() {
+        if (this.con2infoMap == null) {
+            return; // already shutdown in case it is called
+        }
+        LOG.info("Inside shutdown, con2infoMap size=" + con2infoMap.size());
+
+        this.maxConnection = 0;
+        Set<ChannelSftp> cons = con2infoMap.keySet();
+        if (cons != null && cons.size() > 0) {
+            // make a copy since we need to modify the underlying Map
+            Set<ChannelSftp> copy = new HashSet<ChannelSftp>(cons);
+            // Initiate disconnect from all outstanding connections
+            for (ChannelSftp con : copy) {
+                try {
+                    disconnect(con);
+                } catch (IOException ioe) {
+                    ConnectionInfo info = con2infoMap.get(con);
+                    LOG.error(
+                            "Error encountered while closing connection to " + info.getHost(),
+                            ioe);
+                }
+            }
+        }
+        // make sure no further connections can be returned.
+        this.idleConnections = null;
+        this.con2infoMap = null;
+    }
+
+    public synchronized int getMaxConnection() {
+        return maxConnection;
+    }
+
+    public synchronized void setMaxConnection(int maxConn) {
+        this.maxConnection = maxConn;
+    }
+
+    public ChannelSftp connect(String host, int port, String user,
+                               String password, String keyFile) throws IOException {
+        // get connection from pool
+        ConnectionInfo info = new ConnectionInfo(host, port, user);
+        ChannelSftp channel = getFromPool(info);
+
+        if (channel != null) {
+            if (channel.isConnected()) {
+                return channel;
+            } else {
+                channel = null;
+                synchronized (this) {
+                    --liveConnectionCount;
+                    con2infoMap.remove(channel);
+                }
+            }
+        }
+
+        // create a new connection and add to pool
+        JSch jsch = new JSch();
+        Session session = null;
+        try {
+            if (user == null || user.length() == 0) {
+                user = System.getProperty("user.name");
+            }
+
+            if (password == null) {
+                password = "";
+            }
+
+            if (keyFile != null && keyFile.length() > 0) {
+                jsch.addIdentity(keyFile);
+            }
+
+            if (port <= 0) {
+                session = jsch.getSession(user, host);
+            } else {
+                session = jsch.getSession(user, host, port);
+            }
+
+            session.setPassword(password);
+
+            java.util.Properties config = new java.util.Properties();
+            config.put("StrictHostKeyChecking", "no");
+            session.setConfig(config);
+
+            session.connect();
+            channel = (ChannelSftp) session.openChannel("sftp");
+            channel.connect();
+
+            synchronized (this) {
+                con2infoMap.put(channel, info);
+                liveConnectionCount++;
+            }
+
+            return channel;
+
+        } catch (JSchException e) {
+            throw new IOException(StringUtils.stringifyException(e));
+        }
+    }
+
+    void disconnect(ChannelSftp channel) throws IOException {
+        if (channel != null) {
+            // close connection if too many active connections
+            boolean closeConnection = false;
+            synchronized (this) {
+                if (liveConnectionCount > maxConnection) {
+                    --liveConnectionCount;
+                    con2infoMap.remove(channel);
+                    closeConnection = true;
+                }
+            }
+            if (closeConnection) {
+                if (channel.isConnected()) {
+                    try {
+                        Session session = channel.getSession();
+                        channel.disconnect();
+                        session.disconnect();
+                    } catch (JSchException e) {
+                        throw new IOException(StringUtils.stringifyException(e));
+                    }
+                }
+
+            } else {
+                returnToPool(channel);
+            }
+        }
+    }
+
+    public int getIdleCount() {
+        return this.idleConnections.size();
+    }
+
+    public int getLiveConnCount() {
+        return this.liveConnectionCount;
+    }
+
+    public int getConnPoolSize() {
+        return this.con2infoMap.size();
+    }
+
+    /**
+     * Class to capture the minimal set of information that distinguish
+     * between different connections.
+     */
+    static class ConnectionInfo {
+        private String host = "";
+        private int port;
+        private String user = "";
+
+        ConnectionInfo(String hst, int prt, String usr) {
+            this.host = hst;
+            this.port = prt;
+            this.user = usr;
+        }
+
+        public String getHost() {
+            return host;
+        }
+
+        public void setHost(String hst) {
+            this.host = hst;
+        }
+
+        public int getPort() {
+            return port;
+        }
+
+        public void setPort(int prt) {
+            this.port = prt;
+        }
+
+        public String getUser() {
+            return user;
+        }
+
+        public void setUser(String usr) {
+            this.user = usr;
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (this == obj) {
+                return true;
+            }
+
+            if (obj instanceof ConnectionInfo) {
+                ConnectionInfo con = (ConnectionInfo) obj;
+
+                boolean ret = true;
+                if (this.host == null || !this.host.equalsIgnoreCase(con.host)) {
+                    ret = false;
+                }
+                if (this.port >= 0 && this.port != con.port) {
+                    ret = false;
+                }
+                if (this.user == null || !this.user.equalsIgnoreCase(con.user)) {
+                    ret = false;
+                }
+                return ret;
+            } else {
+                return false;
+            }
+
+        }
+
+        @Override
+        public int hashCode() {
+            int hashCode = 0;
+            if (host != null) {
+                hashCode += host.hashCode();
+            }
+            hashCode += port;
+            if (user != null) {
+                hashCode += user.hashCode();
+            }
+            return hashCode;
+        }
+
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
new file mode 100644
index 000000000..d3fc782d9
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPFileSystem.java
@@ -0,0 +1,619 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.system;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.ChannelSftp.LsEntry;
+import com.jcraft.jsch.SftpATTRS;
+import com.jcraft.jsch.SftpException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.permission.FsPermission;
+import org.apache.hadoop.util.Progressable;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.URI;
+import java.net.URLDecoder;
+import java.util.ArrayList;
+import java.util.Vector;
+
+/**
+ * SFTP FileSystem.
+ */
+public class SFTPFileSystem extends FileSystem {
+
+    public static final Logger LOG =
+            LoggerFactory.getLogger(SFTPFileSystem.class);
+
+    private SFTPConnectionPool connectionPool;
+    private URI uri;
+
+    private static final int DEFAULT_SFTP_PORT = 22;
+    public static final int DEFAULT_MAX_CONNECTION = 5;
+    public static final int DEFAULT_BUFFER_SIZE = 1024 * 1024;
+    public static final int DEFAULT_BLOCK_SIZE = 4 * 1024;
+    public static final String FS_SFTP_USER_PREFIX = "fs.sftp.user.";
+    public static final String FS_SFTP_PASSWORD_PREFIX = "fs.sftp.password.";
+    public static final String FS_SFTP_HOST = "fs.sftp.host";
+    public static final String FS_SFTP_HOST_PORT = "fs.sftp.host.port";
+    public static final String FS_SFTP_KEYFILE = "fs.sftp.keyfile";
+    public static final String FS_SFTP_CONNECTION_MAX = "fs.sftp.connection.max";
+    public static final String E_SAME_DIRECTORY_ONLY =
+            "only same directory renames are supported";
+    public static final String E_HOST_NULL = "Invalid host specified";
+    public static final String E_USER_NULL =
+            "No user specified for sftp connection. Expand URI or credential file.";
+    public static final String E_PATH_DIR = "Path %s is a directory.";
+    public static final String E_FILE_STATUS = "Failed to get file status";
+    public static final String E_FILE_NOTFOUND = "File %s does not exist.";
+    public static final String E_FILE_EXIST = "File already exists: %s";
+    public static final String E_CREATE_DIR =
+            "create(): Mkdirs failed to create: %s";
+    public static final String E_DIR_CREATE_FROMFILE =
+            "Can't make directory for path %s since it is a file.";
+    public static final String E_MAKE_DIR_FORPATH =
+            "Can't make directory for path \"%s\" under \"%s\".";
+    public static final String E_DIR_NOTEMPTY = "Directory: %s is not empty.";
+    public static final String E_FILE_CHECK_FAILED = "File check failed";
+    public static final String E_NOT_SUPPORTED = "Not supported";
+    public static final String E_SPATH_NOTEXIST = "Source path %s does not exist";
+    public static final String E_DPATH_EXIST =
+            "Destination path %s already exist, cannot rename!";
+    public static final String E_FAILED_GETHOME = "Failed to get home directory";
+    public static final String E_FAILED_DISCONNECT = "Failed to disconnect";
+
+    private void setConfigurationFromURI(URI uriInfo, Configuration conf)
+            throws IOException {
+
+        // get host information from URI
+        String host = uriInfo.getHost();
+        host = (host == null) ? conf.get(FS_SFTP_HOST, null) : host;
+        if (host == null) {
+            throw new IOException(E_HOST_NULL);
+        }
+        conf.set(FS_SFTP_HOST, host);
+
+        int port = uriInfo.getPort();
+        port = (port == -1) ? conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT) : port;
+        conf.setInt(FS_SFTP_HOST_PORT, port);
+
+        // get user/password information from URI
+        String userAndPwdFromUri = uriInfo.getUserInfo();
+        if (userAndPwdFromUri != null) {
+            String[] userPasswdInfo = userAndPwdFromUri.split(":");
+            String user = userPasswdInfo[0];
+            user = URLDecoder.decode(user, "UTF-8");
+            conf.set(FS_SFTP_USER_PREFIX + host, user);
+            if (userPasswdInfo.length > 1) {
+                conf.set(FS_SFTP_PASSWORD_PREFIX + host + "." +
+                        user, userPasswdInfo[1]);
+            }
+        }
+
+        String user = conf.get(FS_SFTP_USER_PREFIX + host);
+        if (user == null || user.equals("")) {
+            throw new IllegalStateException(E_USER_NULL);
+        }
+
+        int connectionMax =
+                conf.getInt(FS_SFTP_CONNECTION_MAX, DEFAULT_MAX_CONNECTION);
+        connectionPool = new SFTPConnectionPool(connectionMax, connectionMax);
+    }
+
+    private ChannelSftp connect() throws IOException {
+        Configuration conf = getConf();
+
+        String host = conf.get(FS_SFTP_HOST, null);
+        int port = conf.getInt(FS_SFTP_HOST_PORT, DEFAULT_SFTP_PORT);
+        String user = conf.get(FS_SFTP_USER_PREFIX + host, null);
+        String pwd = conf.get(FS_SFTP_PASSWORD_PREFIX + host + "." + user, null);
+        String keyFile = conf.get(FS_SFTP_KEYFILE, null);
+
+        ChannelSftp channel =
+                connectionPool.connect(host, port, user, pwd, keyFile);
+
+        return channel;
+    }
+
+    private void disconnect(ChannelSftp channel) throws IOException {
+        connectionPool.disconnect(channel);
+    }
+
+    private Path makeAbsolute(Path workDir, Path path) {
+        if (path.isAbsolute()) {
+            return path;
+        }
+        return new Path(workDir, path);
+    }
+
+    private boolean exists(ChannelSftp channel, Path file) throws IOException {
+        try {
+            getFileStatus(channel, file);
+            return true;
+        } catch (FileNotFoundException fnfe) {
+            return false;
+        } catch (IOException ioe) {
+            throw new IOException(E_FILE_STATUS, ioe);
+        }
+    }
+
+    /**
+     * Convenience method, so that we don't open a new connection when using this
+     * method from within another method. Otherwise every API invocation incurs
+     * the overhead of opening/closing a TCP connection.
+     */
+    @SuppressWarnings("unchecked")
+    private FileStatus getFileStatus(ChannelSftp client, Path file)
+            throws IOException {
+        FileStatus fileStat = null;
+        Path workDir;
+        try {
+            workDir = new Path(client.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, file);
+        Path parentPath = absolute.getParent();
+        if (parentPath == null) { // root directory
+            long length = -1; // Length of root directory on server not known
+            boolean isDir = true;
+            int blockReplication = 1;
+            long blockSize = DEFAULT_BLOCK_SIZE; // Block Size not known.
+            long modTime = -1; // Modification time of root directory not known.
+            Path root = new Path("/");
+            return new FileStatus(length, isDir, blockReplication, blockSize,
+                    modTime,
+                    root.makeQualified(this.getUri(), this.getWorkingDirectory()));
+        }
+        String pathName = parentPath.toUri().getPath();
+        Vector<LsEntry> sftpFiles;
+        try {
+            sftpFiles = (Vector<LsEntry>) client.ls(pathName);
+        } catch (SftpException e) {
+            throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
+        }
+        if (sftpFiles != null) {
+            for (LsEntry sftpFile : sftpFiles) {
+                if (sftpFile.getFilename().equals(file.getName())) {
+                    // file found in directory
+                    fileStat = getFileStatus(client, sftpFile, parentPath);
+                    break;
+                }
+            }
+            if (fileStat == null) {
+                throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
+            }
+        } else {
+            throw new FileNotFoundException(String.format(E_FILE_NOTFOUND, file));
+        }
+        return fileStat;
+    }
+
+    @SuppressWarnings("checkstyle:MagicNumber")
+    private FileStatus getFileStatus(ChannelSftp channel, LsEntry sftpFile,
+                                     Path parentPath) throws IOException {
+
+        SftpATTRS attr = sftpFile.getAttrs();
+        long length = attr.getSize();
+        boolean isDir = attr.isDir();
+        boolean isLink = attr.isLink();
+        if (isLink) {
+            String link = parentPath.toUri().getPath() + "/" + sftpFile.getFilename();
+            try {
+                link = channel.realpath(link);
+
+                Path linkParent = new Path("/", link);
+
+                FileStatus fstat = getFileStatus(channel, linkParent);
+                isDir = fstat.isDirectory();
+                length = fstat.getLen();
+            } catch (Exception e) {
+                throw new IOException(e);
+            }
+        }
+        int blockReplication = 1;
+        // Using default block size since there is no way in SFTP channel to know of
+        // block sizes on server. The assumption could be less than ideal.
+        long blockSize = DEFAULT_BLOCK_SIZE;
+        long modTime = attr.getMTime() * 1000L; // convert to milliseconds
+        long accessTime = attr.getATime() * 1000L;
+        FsPermission permission = getPermissions(sftpFile);
+        // not be able to get the real user group name, just use the user and group
+        // id
+        String user = Integer.toString(attr.getUId());
+        String group = Integer.toString(attr.getGId());
+        Path filePath = new Path(parentPath, sftpFile.getFilename());
+
+        return new FileStatus(length, isDir, blockReplication, blockSize, modTime,
+                accessTime, permission, user, group, filePath.makeQualified(
+                this.getUri(), this.getWorkingDirectory()));
+    }
+
+    private FsPermission getPermissions(LsEntry sftpFile) {
+        return new FsPermission((short) sftpFile.getAttrs().getPermissions());
+    }
+
+    /**
+     * Convenience method, so that we don't open a new connection when using this
+     * method from within another method. Otherwise every API invocation incurs
+     * the overhead of opening/closing a TCP connection.
+     */
+    private boolean mkdirs(ChannelSftp client, Path file, FsPermission permission)
+            throws IOException {
+        boolean created = true;
+        Path workDir;
+        try {
+            workDir = new Path(client.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, file);
+        String pathName = absolute.getName();
+        if (!exists(client, absolute)) {
+            Path parent = absolute.getParent();
+            created = parent == null || mkdirs(client, parent, FsPermission.getDefault());
+            if (created) {
+                String parentDir = parent.toUri().getPath();
+                boolean succeeded = true;
+                try {
+                    final String previousCwd = client.pwd();
+                    client.cd(parentDir);
+                    client.mkdir(pathName);
+                    client.cd(previousCwd);
+                } catch (SftpException e) {
+                    throw new IOException(String.format(E_MAKE_DIR_FORPATH, pathName,
+                            parentDir));
+                }
+                created = created & succeeded;
+            }
+        } else if (isFile(client, absolute)) {
+            throw new IOException(String.format(E_DIR_CREATE_FROMFILE, absolute));
+        }
+        return created;
+    }
+
+    private boolean isFile(ChannelSftp channel, Path file) throws IOException {
+        try {
+            return !getFileStatus(channel, file).isDirectory();
+        } catch (FileNotFoundException e) {
+            return false; // file does not exist
+        } catch (IOException ioe) {
+            throw new IOException(E_FILE_CHECK_FAILED, ioe);
+        }
+    }
+
+    /**
+     * Convenience method, so that we don't open a new connection when using this
+     * method from within another method. Otherwise every API invocation incurs
+     * the overhead of opening/closing a TCP connection.
+     */
+    private boolean delete(ChannelSftp channel, Path file, boolean recursive)
+            throws IOException {
+        Path workDir;
+        try {
+            workDir = new Path(channel.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, file);
+        String pathName = absolute.toUri().getPath();
+        FileStatus fileStat = null;
+        try {
+            fileStat = getFileStatus(channel, absolute);
+        } catch (FileNotFoundException e) {
+            // file not found, no need to delete, return true
+            return false;
+        }
+        if (!fileStat.isDirectory()) {
+            boolean status = true;
+            try {
+                channel.rm(pathName);
+            } catch (SftpException e) {
+                status = false;
+            }
+            return status;
+        } else {
+            boolean status = true;
+            FileStatus[] dirEntries = listStatus(channel, absolute);
+            if (dirEntries != null && dirEntries.length > 0) {
+                if (!recursive) {
+                    throw new IOException(String.format(E_DIR_NOTEMPTY, file));
+                }
+                for (int i = 0; i < dirEntries.length; ++i) {
+                    delete(channel, new Path(absolute, dirEntries[i].getPath()),
+                            recursive);
+                }
+            }
+            try {
+                channel.rmdir(pathName);
+            } catch (SftpException e) {
+                status = false;
+            }
+            return status;
+        }
+    }
+
+    /**
+     * Convenience method, so that we don't open a new connection when using this
+     * method from within another method. Otherwise every API invocation incurs
+     * the overhead of opening/closing a TCP connection.
+     */
+    @SuppressWarnings("unchecked")
+    private FileStatus[] listStatus(ChannelSftp client, Path file)
+            throws IOException {
+        Path workDir;
+        try {
+            workDir = new Path(client.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, file);
+        FileStatus fileStat = getFileStatus(client, absolute);
+        if (!fileStat.isDirectory()) {
+            return new FileStatus[]{fileStat};
+        }
+        Vector<LsEntry> sftpFiles;
+        try {
+            sftpFiles = (Vector<LsEntry>) client.ls(absolute.toUri().getPath());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        ArrayList<FileStatus> fileStats = new ArrayList<FileStatus>();
+        for (int i = 0; i < sftpFiles.size(); i++) {
+            LsEntry entry = sftpFiles.get(i);
+            String fname = entry.getFilename();
+            // skip current and parent directory, ie. "." and ".."
+            if (!".".equalsIgnoreCase(fname) && !"..".equalsIgnoreCase(fname)) {
+                fileStats.add(getFileStatus(client, entry, absolute));
+            }
+        }
+        return fileStats.toArray(new FileStatus[fileStats.size()]);
+    }
+
+    private boolean rename(ChannelSftp channel, Path src, Path dst)
+            throws IOException {
+        Path workDir;
+        try {
+            workDir = new Path(channel.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absoluteSrc = makeAbsolute(workDir, src);
+        Path absoluteDst = makeAbsolute(workDir, dst);
+
+        if (!exists(channel, absoluteSrc)) {
+            throw new IOException(String.format(E_SPATH_NOTEXIST, src));
+        }
+        if (exists(channel, absoluteDst)) {
+            throw new IOException(String.format(E_DPATH_EXIST, dst));
+        }
+        boolean renamed = true;
+        try {
+            final String previousCwd = channel.pwd();
+            channel.cd("/");
+            channel.rename(src.toUri().getPath(), dst.toUri().getPath());
+            channel.cd(previousCwd);
+        } catch (SftpException e) {
+            renamed = false;
+        }
+        return renamed;
+    }
+
+    @Override
+    public void initialize(URI uriInfo, Configuration conf) throws IOException {
+        super.initialize(uriInfo, conf);
+
+        setConfigurationFromURI(uriInfo, conf);
+        setConf(conf);
+        this.uri = uriInfo;
+    }
+
+    @Override
+    public String getScheme() {
+        return "sftp";
+    }
+
+    @Override
+    public URI getUri() {
+        return uri;
+    }
+
+    @Override
+    public FSDataInputStream open(Path f, int bufferSize) throws IOException {
+        ChannelSftp channel = connect();
+        Path workDir;
+        try {
+            workDir = new Path(channel.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, f);
+        FileStatus fileStat = getFileStatus(channel, absolute);
+        if (fileStat.isDirectory()) {
+            disconnect(channel);
+            throw new IOException(String.format(E_PATH_DIR, f));
+        }
+        InputStream is;
+        try {
+            // the path could be a symbolic link, so get the real path
+            absolute = new Path("/", channel.realpath(absolute.toUri().getPath()));
+
+            is = channel.get(absolute.toUri().getPath());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+
+        FSDataInputStream fis =
+                new FSDataInputStream(new SFTPInputStream(is, channel, statistics));
+        return fis;
+    }
+
+    /**
+     * A stream obtained via this call must be closed before using other APIs of
+     * this class or else the invocation will block.
+     */
+    @Override
+    public FSDataOutputStream create(Path f, FsPermission permission,
+                                     boolean overwrite, int bufferSize, short replication, long blockSize,
+                                     Progressable progress) throws IOException {
+        final ChannelSftp client = connect();
+        Path workDir;
+        try {
+            workDir = new Path(client.pwd());
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        Path absolute = makeAbsolute(workDir, f);
+        if (exists(client, f)) {
+            if (overwrite) {
+                delete(client, f, false);
+            } else {
+                disconnect(client);
+                throw new IOException(String.format(E_FILE_EXIST, f));
+            }
+        }
+        Path parent = absolute.getParent();
+        if (parent == null || !mkdirs(client, parent, FsPermission.getDefault())) {
+            parent = (parent == null) ? new Path("/") : parent;
+            disconnect(client);
+            throw new IOException(String.format(E_CREATE_DIR, parent));
+        }
+        OutputStream os;
+        try {
+            final String previousCwd = client.pwd();
+            client.cd(parent.toUri().getPath());
+            os = client.put(f.getName());
+            client.cd(previousCwd);
+        } catch (SftpException e) {
+            throw new IOException(e);
+        }
+        FSDataOutputStream fos = new FSDataOutputStream(os, statistics) {
+            @Override
+            public void close() throws IOException {
+                super.close();
+                disconnect(client);
+            }
+        };
+
+        return fos;
+    }
+
+    @Override
+    public FSDataOutputStream append(Path f, int bufferSize,
+                                     Progressable progress)
+            throws IOException {
+        throw new IOException(E_NOT_SUPPORTED);
+    }
+
+    /*
+     * The parent of source and destination can be different. It is suppose to
+     * work like 'move'
+     */
+    @Override
+    public boolean rename(Path src, Path dst) throws IOException {
+        ChannelSftp channel = connect();
+        try {
+            boolean success = rename(channel, src, dst);
+            return success;
+        } finally {
+            disconnect(channel);
+        }
+    }
+
+    @Override
+    public boolean delete(Path f, boolean recursive) throws IOException {
+        ChannelSftp channel = connect();
+        try {
+            boolean success = delete(channel, f, recursive);
+            return success;
+        } finally {
+            disconnect(channel);
+        }
+    }
+
+    @Override
+    public FileStatus[] listStatus(Path f) throws IOException {
+        ChannelSftp client = connect();
+        try {
+            FileStatus[] stats = listStatus(client, f);
+            return stats;
+        } finally {
+            disconnect(client);
+        }
+    }
+
+    @Override
+    public void setWorkingDirectory(Path newDir) {
+        // we do not maintain the working directory state
+    }
+
+    @Override
+    public Path getWorkingDirectory() {
+        // Return home directory always since we do not maintain state.
+        return getHomeDirectory();
+    }
+
+    @Override
+    public Path getHomeDirectory() {
+        ChannelSftp channel = null;
+        try {
+            channel = connect();
+            Path homeDir = new Path(channel.pwd());
+            return homeDir;
+        } catch (Exception ioe) {
+            return null;
+        } finally {
+            try {
+                disconnect(channel);
+            } catch (IOException ioe) {
+                return null;
+            }
+        }
+    }
+
+    @Override
+    public boolean mkdirs(Path f, FsPermission permission) throws IOException {
+        ChannelSftp client = connect();
+        try {
+            boolean success = mkdirs(client, f, permission);
+            return success;
+        } finally {
+            disconnect(client);
+        }
+    }
+
+    @Override
+    public FileStatus getFileStatus(Path f) throws IOException {
+        ChannelSftp channel = connect();
+        try {
+            FileStatus status = getFileStatus(channel, f);
+            return status;
+        } finally {
+            disconnect(channel);
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPInputStream.java b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPInputStream.java
new file mode 100644
index 000000000..ba6523d94
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/system/SFTPInputStream.java
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sftp.system;
+
+import com.jcraft.jsch.ChannelSftp;
+import com.jcraft.jsch.JSchException;
+import com.jcraft.jsch.Session;
+import org.apache.hadoop.fs.FSInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.util.StringUtils;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+/**
+ * SFTP FileSystem input stream.
+ */
+public class SFTPInputStream extends FSInputStream {
+
+    public static final String E_SEEK_NOT_SUPPORTED = "Seek not supported";
+    public static final String E_CLIENT_NULL =
+            "SFTP client null or not connected";
+    public static final String E_NULL_INPUT_STREAM = "Null InputStream";
+    public static final String E_STREAM_CLOSED = "Stream closed";
+    public static final String E_CLIENT_NOT_CONNECTED = "Client not connected";
+
+    private InputStream wrappedStream;
+    private ChannelSftp channel;
+    private FileSystem.Statistics stats;
+    private boolean closed;
+    private long pos;
+
+    SFTPInputStream(InputStream stream, ChannelSftp channel,
+                    FileSystem.Statistics stats) {
+
+        if (stream == null) {
+            throw new IllegalArgumentException(E_NULL_INPUT_STREAM);
+        }
+        if (channel == null || !channel.isConnected()) {
+            throw new IllegalArgumentException(E_CLIENT_NULL);
+        }
+        this.wrappedStream = stream;
+        this.channel = channel;
+        this.stats = stats;
+
+        this.pos = 0;
+        this.closed = false;
+    }
+
+    @Override
+    public void seek(long position) throws IOException {
+        throw new IOException(E_SEEK_NOT_SUPPORTED);
+    }
+
+    @Override
+    public boolean seekToNewSource(long targetPos) throws IOException {
+        throw new IOException(E_SEEK_NOT_SUPPORTED);
+    }
+
+    @Override
+    public long getPos() throws IOException {
+        return pos;
+    }
+
+    @Override
+    public synchronized int read() throws IOException {
+        if (closed) {
+            throw new IOException(E_STREAM_CLOSED);
+        }
+
+        int byteRead = wrappedStream.read();
+        if (byteRead >= 0) {
+            pos++;
+        }
+        if (stats != null & byteRead >= 0) {
+            stats.incrementBytesRead(1);
+        }
+        return byteRead;
+    }
+
+    public synchronized int read(byte[] buf, int off, int len)
+            throws IOException {
+        if (closed) {
+            throw new IOException(E_STREAM_CLOSED);
+        }
+
+        int result = wrappedStream.read(buf, off, len);
+        if (result > 0) {
+            pos += result;
+        }
+        if (stats != null & result > 0) {
+            stats.incrementBytesRead(result);
+        }
+
+        return result;
+    }
+
+    public synchronized void close() throws IOException {
+        if (closed) {
+            return;
+        }
+        super.close();
+        closed = true;
+        if (!channel.isConnected()) {
+            throw new IOException(E_CLIENT_NOT_CONNECTED);
+        }
+
+        try {
+            Session session = channel.getSession();
+            channel.disconnect();
+            session.disconnect();
+        } catch (JSchException e) {
+            throw new IOException(StringUtils.stringifyException(e));
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 000000000..a1616a494
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.seatunnel.connectors.seatunnel.file.sftp.system.SFTPFileSystem
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index 60d73ab9d..773ae5143 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -40,6 +40,7 @@
         <module>connector-file-oss</module>
         <module>connector-file-ftp</module>
         <module>connector-file-base-hadoop</module>
+        <module>connector-file-sftp</module>
         <module>connector-file-s3</module>
     </modules>
 
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index ef635378f..4c602b04e 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -200,6 +200,12 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-file-sftp</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
                 <dependency>
                     <groupId>org.apache.seatunnel</groupId>
                     <artifactId>connector-hudi</artifactId>