You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/23 08:08:47 UTC

[incubator-seatunnel] branch dev updated: [Feature][File connector] Support ftp file sink (#2483)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new a87e5de80 [Feature][File connector] Support ftp file sink (#2483)
a87e5de80 is described below

commit a87e5de80a4ee307e61961586726a510846c4435
Author: chessplay <27...@qq.com>
AuthorDate: Tue Aug 23 16:08:40 2022 +0800

    [Feature][File connector] Support ftp file sink (#2483)
    
    
    Co-authored-by: zhoulw11 <zh...@chinatelecom.cn>
---
 docs/en/connector-v2/sink/FtpFile.md               | 148 +++++++++++++++++
 plugin-mapping.properties                          |   1 +
 pom.xml                                            |   9 ++
 seatunnel-connectors-v2-dist/pom.xml               |   5 +
 .../connector-file/connector-file-base/pom.xml     |   5 +
 .../seatunnel/file/config/FileSystemType.java      |   3 +-
 .../file/sink/FileSinkWriterWithTransaction.java   |  64 ++++----
 .../file/sink/TransactionStateFileSinkWriter.java  |  64 ++++----
 .../seatunnel/file/sink/config/FileSystemType.java |   3 +-
 .../{ => connector-file-ftp}/pom.xml               |  25 +--
 .../seatunnel/file/sink/ftp/FtpFileSink.java       |  63 ++++++++
 .../seatunnel/file/sink/ftp/FtpFileSinkPlugin.java |  71 ++++++++
 .../seatunnel/file/sink/ftp/config/FtpConfig.java} |  22 +--
 .../file/sink/ftp/filesystem/FtpFileSystem.java}   |  26 +--
 .../ftp/filesystem/FtpFileSystemCommitter.java     |  57 +++++++
 .../seatunnel/file/sink/ftp/util/FtpFileUtils.java | 179 +++++++++++++++++++++
 .../FtpTransactionStateFileWriteFactory.java       |  78 +++++++++
 .../writer/FtpTxtTransactionStateFileWriter.java   | 125 ++++++++++++++
 seatunnel-connectors-v2/connector-file/pom.xml     |   1 +
 seatunnel-dist/release-docs/LICENSE                |   4 +-
 seatunnel-dist/release-docs/NOTICE                 |   8 +
 .../seatunnel-flink-connector-v2-example/pom.xml   |   5 +
 tools/dependencies/known-dependencies.txt          |   1 -
 23 files changed, 861 insertions(+), 106 deletions(-)

diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md
new file mode 100644
index 000000000..0384671c3
--- /dev/null
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -0,0 +1,148 @@
+# FtpFile
+
+> Ftp file sink connector
+
+## Description
+
+Output data to Ftp . 
+
+
+
+| name                             | type    | required | default value                                             |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| ftp_host                         | string  | yes      | -                                                         |
+| ftp_port                         | int     | yes      | -                                                         |
+| ftp_username                     | string  | yes      | -                                                         |
+| ftp_password                     | string  | yes      | -                                                         |
+| path                             | string  | yes      | -                                                         |
+| file_name_expression             | string  | no       | "${transactionId}"                                        |
+| file_format                      | string  | no       | "text"                                                    |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
+| field_delimiter                  | string  | no       | '\001'                                                    |
+| row_delimiter                    | string  | no       | "\n"                                                      |
+| partition_by                     | array   | no       | -                                                         |
+| partition_dir_expression         | string  | no       | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/"      |
+| is_partition_field_write_in_file | boolean | no       | false                                                     |
+| sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean | no       | true                                                      |
+| save_mode                        | string  | no       | "error"                                                   |
+
+### ftp_host [string]
+
+The target ftp host is required
+
+### ftp_port [int]
+
+The target ftp port is required
+
+### ftp_username [string]
+
+The target ftp username is required
+
+### ftp_password [string]
+
+The target ftp password is required
+
+### path [string]
+
+The target dir path is required.
+
+### file_name_expression [string]
+
+`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+### file_format [string]
+
+We supported as the following file types:
+
+`text`
+
+Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
+
+### filename_time_format [string]
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description        |
+| ------ | ------------------ |
+| y      | Year               |
+| M      | Month              |
+| d      | Day of month       |
+| H      | Hour in day (0-23) |
+| m      | Minute in hour     |
+| s      | Second in minute   |
+
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` and `csv` file format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` and `csv` file format.
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### partition_dir_expression [string]
+
+If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory.
+
+Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file.
+
+For example, if you want to write a Hive Data File, Its value should be `false`.
+
+### sink_columns [array]
+
+Which columns need be write to file, default value is all of the columns get from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
+
+If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
+
+For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
+
+## Example
+
+For text file format
+
+```bash
+
+FtpFile {
+    ftp_host="xxx.xxx.xxx.xxx"
+    ftp_port=21
+    ftp_username="username"
+    ftp_password="password"
+    path="/data/ftp"
+    field_delimiter="\t"
+    row_delimiter="\n"
+    partition_by=["age"]
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    sink_columns=["name","age"]
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+}
+
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index bf9e17a38..64b008491 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -118,3 +118,4 @@ seatunnel.sink.elasticsearch = connector-elasticsearch
 seatunnel.source.IoTDB = connector-iotdb
 seatunnel.sink.IoTDB = connector-iotdb
 seatunnel.sink.Neo4j = connector-neo4j
+seatunnel.sink.FtpFile = connector-file-ftp
diff --git a/pom.xml b/pom.xml
index cf65b3073..6ee15ec06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
         <mongo-spark.version>2.2.0</mongo-spark.version>
         <spark-redis.version>2.6.0</spark-redis.version>
         <commons-lang3.version>3.4</commons-lang3.version>
+        <commons-net.version>3.6</commons-net.version>
         <kudu.version>1.11.1</kudu.version>
         <email.version>1.5.6</email.version>
         <commons-collections4.version>4.4</commons-collections4.version>
@@ -962,7 +963,14 @@
                 <version>${awaitility.version}</version>
                 <scope>test</scope>
             </dependency>
+
+            <dependency>
+                <groupId>commons-net</groupId>
+                <artifactId>commons-net</artifactId>
+                <version>${commons-net.version}</version>
+            </dependency>
         </dependencies>
+
     </dependencyManagement>
 
     <dependencies>
@@ -995,6 +1003,7 @@
             <groupId>org.junit.jupiter</groupId>
             <artifactId>junit-jupiter-params</artifactId>
         </dependency>
+
     </dependencies>
 
     <build>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 80be3a66c..9791af172 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -101,6 +101,11 @@
             <artifactId>connector-file-local</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-ftp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-hudi</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 03fc5a56d..7927451c9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>parquet-avro</artifactId>
         </dependency>
 
+        <dependency>
+            <groupId>commons-net</groupId>
+            <artifactId>commons-net</artifactId>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.orc</groupId>
             <artifactId>orc-core</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 28399d25e..3156c9796 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
 
 public enum FileSystemType implements Serializable {
     HDFS("HdfsFile"),
-    LOCAL("LocalFile");
+    LOCAL("LocalFile"),
+    FTP("FtpFile");
 
     private final String fileSystemPluginName;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
index 83e51d1bc..3c460101a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
@@ -61,22 +61,22 @@ public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow, F
         this.textFileSinkConfig = textFileSinkConfig;
 
         Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.textFileSinkConfig.getFileFormat(),
-                this.textFileSinkConfig.getFileNameExpression(),
-                this.textFileSinkConfig.getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.textFileSinkConfig.getPartitionFieldList(),
-                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
-                this.textFileSinkConfig.getPartitionDirExpression()),
-            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
-            this.textFileSinkConfig.getTmpPath(),
-            this.textFileSinkConfig.getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.textFileSinkConfig.getFieldDelimiter(),
-            this.textFileSinkConfig.getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
+                new FileSinkTransactionFileNameGenerator(
+                        this.textFileSinkConfig.getFileFormat(),
+                        this.textFileSinkConfig.getFileNameExpression(),
+                        this.textFileSinkConfig.getFileNameTimeFormat()),
+                new FileSinkPartitionDirNameGenerator(
+                        this.textFileSinkConfig.getPartitionFieldList(),
+                        this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                        this.textFileSinkConfig.getPartitionDirExpression()),
+                this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+                this.textFileSinkConfig.getTmpPath(),
+                this.textFileSinkConfig.getPath(),
+                this.jobId,
+                this.context.getIndexOfSubtask(),
+                this.textFileSinkConfig.getFieldDelimiter(),
+                this.textFileSinkConfig.getRowDelimiter(),
+                sinkFileSystemPlugin.getFileSystem().get());
 
         if (!transactionStateFileWriter.isPresent()) {
             throw new RuntimeException("A TransactionStateFileWriter is need");
@@ -100,22 +100,22 @@ public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow, F
         this.jobId = jobId;
 
         Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.textFileSinkConfig.getFileFormat(),
-                this.textFileSinkConfig.getFileNameExpression(),
-                this.textFileSinkConfig.getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.textFileSinkConfig.getPartitionFieldList(),
-                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
-                this.textFileSinkConfig.getPartitionDirExpression()),
-            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
-            this.textFileSinkConfig.getTmpPath(),
-            this.textFileSinkConfig.getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.textFileSinkConfig.getFieldDelimiter(),
-            this.textFileSinkConfig.getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
+                new FileSinkTransactionFileNameGenerator(
+                        this.textFileSinkConfig.getFileFormat(),
+                        this.textFileSinkConfig.getFileNameExpression(),
+                        this.textFileSinkConfig.getFileNameTimeFormat()),
+                new FileSinkPartitionDirNameGenerator(
+                        this.textFileSinkConfig.getPartitionFieldList(),
+                        this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                        this.textFileSinkConfig.getPartitionDirExpression()),
+                this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+                this.textFileSinkConfig.getTmpPath(),
+                this.textFileSinkConfig.getPath(),
+                this.jobId,
+                this.context.getIndexOfSubtask(),
+                this.textFileSinkConfig.getFieldDelimiter(),
+                this.textFileSinkConfig.getRowDelimiter(),
+                sinkFileSystemPlugin.getFileSystem().get());
 
         if (!transactionStateFileWriter.isPresent()) {
             throw new RuntimeException("A TransactionStateFileWriter is need");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
index 0bdad1afe..8661693e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
@@ -61,22 +61,22 @@ public class TransactionStateFileSinkWriter implements SinkWriter<SeaTunnelRow,
         this.textFileSinkConfig = textFileSinkConfig;
 
         Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.textFileSinkConfig.getFileFormat(),
-                this.textFileSinkConfig.getFileNameExpression(),
-                this.textFileSinkConfig.getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.textFileSinkConfig.getPartitionFieldList(),
-                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
-                this.textFileSinkConfig.getPartitionDirExpression()),
-            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
-            this.textFileSinkConfig.getTmpPath(),
-            this.textFileSinkConfig.getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.textFileSinkConfig.getFieldDelimiter(),
-            this.textFileSinkConfig.getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
+                new FileSinkTransactionFileNameGenerator(
+                        this.textFileSinkConfig.getFileFormat(),
+                        this.textFileSinkConfig.getFileNameExpression(),
+                        this.textFileSinkConfig.getFileNameTimeFormat()),
+                new FileSinkPartitionDirNameGenerator(
+                        this.textFileSinkConfig.getPartitionFieldList(),
+                        this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                        this.textFileSinkConfig.getPartitionDirExpression()),
+                this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+                this.textFileSinkConfig.getTmpPath(),
+                this.textFileSinkConfig.getPath(),
+                this.jobId,
+                this.context.getIndexOfSubtask(),
+                this.textFileSinkConfig.getFieldDelimiter(),
+                this.textFileSinkConfig.getRowDelimiter(),
+                sinkFileSystemPlugin.getFileSystem().get());
 
         if (!transactionStateFileWriter.isPresent()) {
             throw new RuntimeException("A TransactionStateFileWriter is need");
@@ -100,22 +100,22 @@ public class TransactionStateFileSinkWriter implements SinkWriter<SeaTunnelRow,
         this.jobId = jobId;
 
         Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
-            new FileSinkTransactionFileNameGenerator(
-                this.textFileSinkConfig.getFileFormat(),
-                this.textFileSinkConfig.getFileNameExpression(),
-                this.textFileSinkConfig.getFileNameTimeFormat()),
-            new FileSinkPartitionDirNameGenerator(
-                this.textFileSinkConfig.getPartitionFieldList(),
-                this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
-                this.textFileSinkConfig.getPartitionDirExpression()),
-            this.textFileSinkConfig.getSinkColumnsIndexInRow(),
-            this.textFileSinkConfig.getTmpPath(),
-            this.textFileSinkConfig.getPath(),
-            this.jobId,
-            this.context.getIndexOfSubtask(),
-            this.textFileSinkConfig.getFieldDelimiter(),
-            this.textFileSinkConfig.getRowDelimiter(),
-            sinkFileSystemPlugin.getFileSystem().get());
+                new FileSinkTransactionFileNameGenerator(
+                        this.textFileSinkConfig.getFileFormat(),
+                        this.textFileSinkConfig.getFileNameExpression(),
+                        this.textFileSinkConfig.getFileNameTimeFormat()),
+                new FileSinkPartitionDirNameGenerator(
+                        this.textFileSinkConfig.getPartitionFieldList(),
+                        this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+                        this.textFileSinkConfig.getPartitionDirExpression()),
+                this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+                this.textFileSinkConfig.getTmpPath(),
+                this.textFileSinkConfig.getPath(),
+                this.jobId,
+                this.context.getIndexOfSubtask(),
+                this.textFileSinkConfig.getFieldDelimiter(),
+                this.textFileSinkConfig.getRowDelimiter(),
+                sinkFileSystemPlugin.getFileSystem().get());
 
         if (!transactionStateFileWriter.isPresent()) {
             throw new RuntimeException("A TransactionStateFileWriter is need");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
index 58c1ba157..fd56b1a50 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
 
 public enum FileSystemType implements Serializable {
     HDFS("HdfsFile"),
-    LOCAL("LocalFile");
+    LOCAL("LocalFile"),
+    FTP("FtpFile");
 
     private String sinkFileSystemPluginName;
 
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
similarity index 74%
copy from seatunnel-connectors-v2/connector-file/pom.xml
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index b84010b8a..4fcba546f 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -21,17 +21,20 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-connectors-v2</artifactId>
-        <groupId>org.apache.seatunnel</groupId>
-        <version>${revision}</version>
-    </parent>
-    <modelVersion>4.0.0</modelVersion>
     <artifactId>connector-file</artifactId>
-    <packaging>pom</packaging>
+    <groupId>org.apache.seatunnel</groupId>
+    <version>${revision}</version>
+        </parent>
+
+    <modelVersion>4.0.0</modelVersion>
+    <artifactId>connector-file-ftp</artifactId>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 
-    <modules>
-        <module>connector-file-base</module>
-        <module>connector-file-hadoop</module>
-        <module>connector-file-local</module>
-    </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java
new file mode 100644
index 000000000..77458eff4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.config.FtpConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class FtpFileSink extends AbstractFileSink {
+
+    private String ftpHost;
+    private Integer ftpPort;
+    private String ftpUserName;
+    private String ftpPwd;
+
+    @Override
+    public SinkFileSystemPlugin getSinkFileSystemPlugin() {
+        return new FtpFileSinkPlugin();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        super.prepare(pluginConfig);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                FtpConfig.FTP_HOST, FtpConfig.FTP_PORT, FtpConfig.FTP_USERNAME, FtpConfig.FTP_PASSWORD);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        } else {
+            this.ftpHost = pluginConfig.getString(FtpConfig.FTP_HOST);
+            this.ftpPort = pluginConfig.getInt(FtpConfig.FTP_PORT);
+            this.ftpUserName = pluginConfig.getString(FtpConfig.FTP_USERNAME);
+            this.ftpPwd = pluginConfig.getString(FtpConfig.FTP_PASSWORD);
+            FtpFileUtils.initFTPClient(this.ftpHost, this.ftpPort, this.ftpUserName, this.ftpPwd);
+
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java
new file mode 100644
index 000000000..6689b2f40
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem.FtpFileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem.FtpFileSystemCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer.FtpTransactionStateFileWriteFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FtpFileSinkPlugin implements SinkFileSystemPlugin {
+
+    @Override
+    public String getPluginName() {
+        return FileSystemType.FTP.getSinkFileSystemPluginName();
+    }
+
+    @Override
+    public Optional<TransactionStateFileWriter> getTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                                              @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                                              @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                                              @NonNull List<Integer> sinkColumnsIndexInRow,
+                                                                              @NonNull String tmpPath,
+                                                                              @NonNull String targetPath,
+                                                                              @NonNull String jobId,
+                                                                              int subTaskIndex,
+                                                                              @NonNull String fieldDelimiter,
+                                                                              @NonNull String rowDelimiter,
+                                                                              @NonNull FileSystem fileSystem) {
+        return Optional.of(FtpTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo,
+                transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow,
+                tmpPath, targetPath, jobId, subTaskIndex,
+                fieldDelimiter, rowDelimiter, fileSystem));
+    }
+
+    @Override
+    public Optional<FileSystemCommitter> getFileSystemCommitter() {
+        return Optional.of(new FtpFileSystemCommitter());
+    }
+
+    @Override
+    public Optional<FileSystem> getFileSystem() {
+        return Optional.of(new FtpFileSystem());
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
similarity index 64%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
index 28399d25e..0bcce0f45 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
@@ -15,21 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.config;
 
-import java.io.Serializable;
-
-public enum FileSystemType implements Serializable {
-    HDFS("HdfsFile"),
-    LOCAL("LocalFile");
-
-    private final String fileSystemPluginName;
-
-    FileSystemType(String fileSystemPluginName) {
-        this.fileSystemPluginName = fileSystemPluginName;
-    }
-
-    public String getFileSystemPluginName() {
-        return fileSystemPluginName;
-    }
+public class FtpConfig {
+    public static final String FTP_PASSWORD = "ftp_password";
+    public static final String FTP_USERNAME = "ftp_username";
+    public static final String FTP_HOST = "ftp_host";
+    public static final String FTP_PORT = "ftp_port";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
similarity index 50%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
index 28399d25e..83de31efb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
@@ -15,21 +15,27 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem;
 
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
 
-public enum FileSystemType implements Serializable {
-    HDFS("HdfsFile"),
-    LOCAL("LocalFile");
+import org.apache.commons.net.ftp.FTPClient;
 
-    private final String fileSystemPluginName;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
 
-    FileSystemType(String fileSystemPluginName) {
-        this.fileSystemPluginName = fileSystemPluginName;
+public class FtpFileSystem implements FileSystem {
+    @Override
+    public void deleteFile(String path) throws IOException {
+        FtpFileUtils.deleteFile(path);
     }
 
-    public String getFileSystemPluginName() {
-        return fileSystemPluginName;
+    @Override
+    public List<String> dirList(String dirPath) throws IOException {
+        FTPClient ftpClient = FtpFileUtils.getFTPClient();
+        return Arrays.stream(ftpClient.listFiles(dirPath)).map(dir -> dir.getName()).collect(Collectors.toList());
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java
new file mode 100644
index 000000000..37154de6f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem;
+
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FtpFileSystemCommitter implements FileSystemCommitter {
+    @Override
+    public void commitTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
+        for (Map.Entry<String, Map<String, String>> entry :
+                aggregateCommitInfo.getTransactionMap().entrySet()) {
+            for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
+                String key = mvFileEntry.getKey();
+                String value = mvFileEntry.getValue();
+                FtpFileUtils.renameFile(key, value);
+            }
+            FtpFileUtils.deleteFiles(entry.getKey());
+        }
+    }
+
+    @Override
+    public void abortTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
+        for (Map.Entry<String, Map<String, String>> entry :
+                aggregateCommitInfo.getTransactionMap().entrySet()) {
+            for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
+                String oldFile = mvFileEntry.getKey();
+                String newFile = mvFileEntry.getValue();
+                if (FtpFileUtils.fileExist(newFile) && !FtpFileUtils.fileExist(oldFile)) {
+                    FtpFileUtils.renameFile(mvFileEntry.getValue(), mvFileEntry.getKey());
+                }
+            }
+            FtpFileUtils.deleteFiles(entry.getKey());
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java
new file mode 100644
index 000000000..cefa023fb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util;
+
+import lombok.NonNull;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.util.StringTokenizer;
+
+public class FtpFileUtils {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FtpFileUtils.class);
+    private static volatile FTPClient FTPCLIENT;
+    private static String FTP_PASSWORD;
+    private static String FTP_USERNAME;
+    private static String FTP_HOST;
+    private static Integer FTP_PORT;
+    private static final int FTP_CONNECT_MAX_TIMEOUT = 30000;
+
+    public static FTPClient getFTPClient(){
+        return initFTPClient(FTP_HOST, FTP_PORT, FTP_USERNAME, FTP_PASSWORD);
+    }
+
+    public static FTPClient initFTPClient(@NonNull String ftpHost,
+                                          @NonNull int ftpPort,
+                                          @NonNull String ftpUserName,
+                                          @NonNull String ftpPassword) {
+        if (null == FTPCLIENT) {
+            synchronized (FtpFileUtils.class) {
+                if (null == FTPCLIENT) {
+                    FtpFileUtils.FTP_HOST = ftpHost;
+                    FtpFileUtils.FTP_PORT = ftpPort;
+                    FtpFileUtils.FTP_USERNAME = ftpUserName;
+                    FtpFileUtils.FTP_PASSWORD = ftpPassword;
+                    try {
+                        FTPCLIENT = new FTPClient();
+                        FTPCLIENT.connect(ftpHost, ftpPort);
+                        FTPCLIENT.login(ftpUserName, ftpPassword);
+                        FTPCLIENT.setConnectTimeout(FTP_CONNECT_MAX_TIMEOUT);
+                        FTPCLIENT.setControlEncoding("utf-8");
+                        FTPCLIENT.enterLocalPassiveMode();
+                        FTPCLIENT.setFileType(FTP.BINARY_FILE_TYPE);
+                        if (!FTPReply.isPositiveCompletion(FTPCLIENT.getReplyCode())) {
+                            FTPCLIENT.disconnect();
+                        }
+                    } catch (SocketException e) {
+                        e.printStackTrace();
+                    } catch (IOException e) {
+                        e.printStackTrace();
+                    }
+                }
+            }
+        }
+        return FTPCLIENT;
+    }
+
+    public static OutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
+        FTPClient ftpClient = FtpFileUtils.getFTPClient();
+        if (!fileExist(outFilePath)) {
+            createFile(outFilePath);
+        }
+        OutputStream outputStream = ftpClient.appendFileStream(new String(outFilePath.getBytes("UTF-8"), "iso-8859-1"));
+        return outputStream;
+    }
+
+    public static boolean createDir(@NonNull String dirPath) throws IOException {
+        FTPClient ftpClient = getFTPClient();
+        StringTokenizer s = new StringTokenizer(dirPath, "/"); // sign
+        s.countTokens();
+        String pathName = "";
+        while (s.hasMoreElements()) {
+            pathName = pathName + "/" +  s.nextElement();
+            try {
+                ftpClient.makeDirectory(pathName);
+            } catch (Exception e) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    public static boolean createFile(@NonNull String filePath) throws IOException {
+        FTPClient ftpClient = getFTPClient();
+        StringTokenizer s = new StringTokenizer(filePath, "/"); // sign
+        int c = s.countTokens();
+        String dirName = "";
+        for (int i = 0; i < c - 1; i++) {
+            dirName = dirName + "/" +  s.nextElement();
+        }
+        createDir(dirName);
+        byte[] b = "".getBytes();
+        InputStream inputStream = new ByteArrayInputStream(b);
+        ftpClient.storeFile(filePath, inputStream);
+        return true;
+    }
+
+    public static boolean fileExist(@NonNull String filePath) throws IOException {
+        FTPClient ftpClient = getFTPClient();
+        String[] listNames = ftpClient.listNames(filePath);
+        if (listNames != null) {
+            return listNames.length > 0;
+        }
+        else {
+            return false;
+        }
+    }
+
+    public static void renameFile(@NonNull String oldName, @NonNull String newName) throws IOException {
+        LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
+        FTPClient ftpClient = getFTPClient();
+        if (!FtpFileUtils.fileExist(newName)) {
+            FtpFileUtils.createFile(newName);
+        } else {
+            FtpFileUtils.deleteFile(newName);
+        }
+        if (ftpClient.rename(oldName, newName)) {
+            LOGGER.info("rename file  :[" + oldName + "] to [" + newName + "] finish");
+        } else {
+            throw new IOException("rename file :[" + oldName + "] to [" + newName + "] error");
+        }
+    }
+
+    public static void deleteFile(@NonNull String filePath) throws IOException {
+        FTPClient ftpClient = getFTPClient();
+        ftpClient.deleteFile(filePath);
+    }
+
+    public static boolean deleteFiles(@NonNull String pathName) {
+        FTPClient ftpClient = getFTPClient();
+        try {
+            FTPFile[] ftpFiles = ftpClient.listFiles(pathName);
+            if (null != ftpFiles && ftpFiles.length > 0) {
+                for (int i = 0; i < ftpFiles.length; i++) {
+                    FTPFile thisFile = ftpFiles[i];
+                    if (thisFile.isDirectory()) {
+                        if (deleteFiles(pathName + "/" + thisFile.getName())){
+                            ftpClient.removeDirectory(pathName);
+                        } else {
+                            return false;
+                        }
+                    } else {
+                        if (!ftpClient.deleteFile(pathName)) {
+                            return false;
+                        }
+                    }
+                }
+            }
+            ftpClient.removeDirectory(pathName);
+        } catch (Exception e) {
+            LOGGER.error("delete file [" + pathName + "] error");
+            return false;
+        }
+        return true;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..bfd130cb7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class FtpTransactionStateFileWriteFactory {
+
+    private FtpTransactionStateFileWriteFactory() {}
+
+    public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                                @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                                @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                                @NonNull List<Integer> sinkColumnsIndexInRow,
+                                                @NonNull String tmpPath,
+                                                @NonNull String targetPath,
+                                                @NonNull String jobId,
+                                                int subTaskIndex,
+                                                @NonNull String fieldDelimiter,
+                                                @NonNull String rowDelimiter,
+                                                @NonNull FileSystem fileSystem
+                                                ) {
+        FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+        FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+        if (fileFormat.equals(FileFormat.CSV)) {
+            return new FtpTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+        }
+        // if file type not supported by file connector, default txt writer will be generated
+        return new FtpTxtTransactionStateFileWriter(
+                    seaTunnelRowTypeInfo,
+                    transactionFileNameGenerator,
+                    partitionDirNameGenerator,
+                    sinkColumnsIndexInRow,
+                    tmpPath,
+                    targetPath,
+                    jobId,
+                    subTaskIndex,
+                    fieldDelimiter,
+                    rowDelimiter,
+                    fileSystem);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java
new file mode 100644
index 000000000..b2d1786c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FtpTxtTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+    private static final Logger LOGGER = LoggerFactory.getLogger(FtpTxtTransactionStateFileWriter.class);
+
+    private Map<String, OutputStream> beingWrittenOutputStream;
+    private String fieldDelimiter;
+    private String rowDelimiter;
+
+    public FtpTxtTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+                                            @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+                                            @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+                                            @NonNull List<Integer> sinkColumnsIndexInRow,
+                                            @NonNull String tmpPath,
+                                            @NonNull String targetPath,
+                                            @NonNull String jobId,
+                                            int subTaskIndex,
+                                            @NonNull String fieldDelimiter,
+                                            @NonNull String rowDelimiter,
+                                            @NonNull FileSystem fileSystem
+    ) {
+        super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+        this.fieldDelimiter = fieldDelimiter;
+        this.rowDelimiter = rowDelimiter;
+    }
+
+    @Override
+    public void beginTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void abortTransaction(String transactionId) {
+        this.beingWrittenOutputStream = new HashMap<>();
+    }
+
+    @Override
+    public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+        String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+        OutputStream outputStream = getOrCreateOutputStream(filePath);
+        String line = transformRowToLine(seaTunnelRow);
+        try {
+            outputStream.write(line.getBytes());
+            outputStream.write(rowDelimiter.getBytes());
+        } catch (IOException e) {
+            LOGGER.error("write data to file {} error", filePath);
+            throw new RuntimeException(e);
+        }
+    }
+
+    @Override
+    public void finishAndCloseWriteFile() {
+        beingWrittenOutputStream.entrySet().forEach(entry -> {
+            try {
+                entry.getValue().flush();
+            } catch (IOException e) {
+                LOGGER.error("error when flush file {}", entry.getKey());
+                throw new RuntimeException(e);
+            } finally {
+                try {
+                    entry.getValue().close();
+                } catch (IOException e) {
+                    LOGGER.error("error when close output stream {}", entry.getKey());
+                }
+            }
+            needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
+        });
+    }
+
+    private OutputStream getOrCreateOutputStream(@NonNull String filePath) {
+        OutputStream outputStream = beingWrittenOutputStream.get(filePath);
+        if (outputStream == null) {
+            try {
+                outputStream =  FtpFileUtils.getOutputStream(filePath);
+                beingWrittenOutputStream.put(filePath, outputStream);
+            } catch (IOException e) {
+                LOGGER.error("can not get output file stream");
+                throw new RuntimeException(e);
+            }
+        }
+        return outputStream;
+    }
+
+    private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+        return this.sinkColumnsIndexInRow.stream().map(index ->
+                seaTunnelRow.getFields()[index] == null ? "" : seaTunnelRow.getFields()[index].toString())
+                        .collect(Collectors.joining(fieldDelimiter));
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index b84010b8a..951ccdc33 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -33,5 +33,6 @@
         <module>connector-file-base</module>
         <module>connector-file-hadoop</module>
         <module>connector-file-local</module>
+        <module>connector-file-ftp</module>
     </modules>
 </project>
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index bb1745d18..d717ec093 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -357,7 +357,7 @@ The text of each license is the standard Apache 2.0 license.
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.5 - http://commons.apache.org/proper/commons-lang/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.6 - http://commons.apache.org/proper/commons-lang/)
      (Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.8.1 - http://commons.apache.org/proper/commons-lang/)
-     (Apache License, Version 2.0) Apache Commons Net (commons-net:commons-net:3.6 - http://commons.apache.org/proper/commons-net/)
+     (Apache License, Version 2.0) Apache Commons Net (org.apache.commons-net:commons-net:3.6 - http://commons.apache.org/proper/commons-net/)
      (Apache License, Version 2.0) Apache Commons Text (org.apache.commons:commons-text:1.3 - http://commons.apache.org/proper/commons-text/)
      (Apache License, Version 2.0) Apache HBase - Annotations (org.apache.hbase:hbase-annotations:2.0.0 - http://hbase.apache.org/hbase-annotations)
      (Apache License, Version 2.0) Apache HBase - Client (org.apache.hbase:hbase-client:2.0.0 - http://hbase.apache.org/hbase-build-configuration/hbase-client)
@@ -688,7 +688,7 @@ The text of each license is the standard Apache 2.0 license.
      (The Apache Software License, Version 2.0) Commons IO (commons-io:commons-io:2.4 - http://commons.apache.org/io/)
      (The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
      (The Apache Software License, Version 2.0) Commons Math (org.apache.commons:commons-math3:3.1.1 - http://commons.apache.org/math/)
-     (The Apache Software License, Version 2.0) Commons Net (commons-net:commons-net:3.1 - http://commons.apache.org/net/)
+     (The Apache Software License, Version 2.0) Commons Net (commons-net:commons-net:3.6 - http://commons.apache.org/net/)
      (The Apache Software License, Version 2.0) Commons Pool (commons-pool:commons-pool:1.5.4 - http://commons.apache.org/pool/)
      (The Apache Software License, Version 2.0) Commons Pool (commons-pool:commons-pool:1.6 - http://commons.apache.org/pool/)
      (The Apache Software License, Version 2.0) Converter: Moshi (com.squareup.retrofit2:converter-moshi:2.9.0 - https://github.com/square/retrofit)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index c6ae39191..ff8985575 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -356,6 +356,14 @@ framework, which can be obtained at:
   * HOMEPAGE:
     * http://commons.apache.org/logging/
 
+This product optionally depends on 'Apache Commons Net', a net
+framework, which can be obtained at:
+
+  * LICENSE:
+    * license/LICENSE.commons-net.txt (Apache License 2.0)
+  * HOMEPAGE:
+    * http://commons.apache.org/commons-net/
+
 This product optionally depends on 'Apache Log4J', a logging framework, which
 can be obtained at:
 
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 28756ccbf..d9eb08874 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -61,6 +61,11 @@
             <artifactId>connector-file-local</artifactId>
             <version>${project.version}</version>
         </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-ftp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>connector-socket</artifactId>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 172b11d80..dad98d576 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -89,7 +89,6 @@ commons-logging-1.2.jar
 commons-math3-3.1.1.jar
 commons-math3-3.4.1.jar
 commons-math3-3.5.jar
-commons-net-3.1.jar
 commons-net-3.6.jar
 commons-pool-1.6.jar
 commons-pool2-2.0.jar