You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/23 08:08:47 UTC
[incubator-seatunnel] branch dev updated: [Feature][File connector] Support ftp file sink (#2483)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new a87e5de80 [Feature][File connector] Support ftp file sink (#2483)
a87e5de80 is described below
commit a87e5de80a4ee307e61961586726a510846c4435
Author: chessplay <27...@qq.com>
AuthorDate: Tue Aug 23 16:08:40 2022 +0800
[Feature][File connector] Support ftp file sink (#2483)
Co-authored-by: zhoulw11 <zh...@chinatelecom.cn>
---
docs/en/connector-v2/sink/FtpFile.md | 148 +++++++++++++++++
plugin-mapping.properties | 1 +
pom.xml | 9 ++
seatunnel-connectors-v2-dist/pom.xml | 5 +
.../connector-file/connector-file-base/pom.xml | 5 +
.../seatunnel/file/config/FileSystemType.java | 3 +-
.../file/sink/FileSinkWriterWithTransaction.java | 64 ++++----
.../file/sink/TransactionStateFileSinkWriter.java | 64 ++++----
.../seatunnel/file/sink/config/FileSystemType.java | 3 +-
.../{ => connector-file-ftp}/pom.xml | 25 +--
.../seatunnel/file/sink/ftp/FtpFileSink.java | 63 ++++++++
.../seatunnel/file/sink/ftp/FtpFileSinkPlugin.java | 71 ++++++++
.../seatunnel/file/sink/ftp/config/FtpConfig.java} | 22 +--
.../file/sink/ftp/filesystem/FtpFileSystem.java} | 26 +--
.../ftp/filesystem/FtpFileSystemCommitter.java | 57 +++++++
.../seatunnel/file/sink/ftp/util/FtpFileUtils.java | 179 +++++++++++++++++++++
.../FtpTransactionStateFileWriteFactory.java | 78 +++++++++
.../writer/FtpTxtTransactionStateFileWriter.java | 125 ++++++++++++++
seatunnel-connectors-v2/connector-file/pom.xml | 1 +
seatunnel-dist/release-docs/LICENSE | 4 +-
seatunnel-dist/release-docs/NOTICE | 8 +
.../seatunnel-flink-connector-v2-example/pom.xml | 5 +
tools/dependencies/known-dependencies.txt | 1 -
23 files changed, 861 insertions(+), 106 deletions(-)
diff --git a/docs/en/connector-v2/sink/FtpFile.md b/docs/en/connector-v2/sink/FtpFile.md
new file mode 100644
index 000000000..0384671c3
--- /dev/null
+++ b/docs/en/connector-v2/sink/FtpFile.md
@@ -0,0 +1,148 @@
+# FtpFile
+
+> Ftp file sink connector
+
+## Description
+
+Output data to Ftp .
+
+
+
+| name | type | required | default value |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| ftp_host | string | yes | - |
+| ftp_port | int | yes | - |
+| ftp_username | string | yes | - |
+| ftp_password | string | yes | - |
+| path | string | yes | - |
+| file_name_expression | string | no | "${transactionId}" |
+| file_format | string | no | "text" |
+| filename_time_format | string | no | "yyyy.MM.dd" |
+| field_delimiter | string | no | '\001' |
+| row_delimiter | string | no | "\n" |
+| partition_by | array | no | - |
+| partition_dir_expression | string | no | "\${k0}=\${v0}\/\${k1}=\${v1}\/...\/\${kn}=\${vn}\/" |
+| is_partition_field_write_in_file | boolean | no | false |
+| sink_columns | array | no | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction | boolean | no | true |
+| save_mode | string | no | "error" |
+
+### ftp_host [string]
+
+The target ftp host is required
+
+### ftp_port [int]
+
+The target ftp port is required
+
+### ftp_username [string]
+
+The target ftp username is required
+
+### ftp_password [string]
+
+The target ftp password is required
+
+### path [string]
+
+The target dir path is required.
+
+### file_name_expression [string]
+
+`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+### file_format [string]
+
+We supported as the following file types:
+
+`text`
+
+Please note that, The final file name will ends with the file_format's suffix, the suffix of the text file is `txt`.
+
+### filename_time_format [string]
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description |
+| ------ | ------------------ |
+| y | Year |
+| M | Month |
+| d | Day of month |
+| H | Hour in day (0-23) |
+| m | Minute in hour |
+| s | Second in minute |
+
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` and `csv` file format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` and `csv` file format.
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### partition_dir_expression [string]
+
+If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory.
+
+Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be write into data file.
+
+For example, if you want to write a Hive Data File, Its value should be `false`.
+
+### sink_columns [array]
+
+Which columns need be write to file, default value is all of the columns get from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### save_mode [string]
+
+Storage mode, currently supports `overwrite`. This means we will delete the old file when a new file have a same name with it.
+
+If `is_enable_transaction` is `true`, Basically, we won't encounter the same file name. Because we will add the transaction id to file name.
+
+For the specific meaning of each mode, see [save-modes](https://spark.apache.org/docs/latest/sql-programming-guide.html#save-modes)
+
+## Example
+
+For text file format
+
+```bash
+
+FtpFile {
+ ftp_host="xxx.xxx.xxx.xxx"
+ ftp_port=21
+ ftp_username="username"
+ ftp_password="password"
+ path="/data/ftp"
+ field_delimiter="\t"
+ row_delimiter="\n"
+ partition_by=["age"]
+ partition_dir_expression="${k0}=${v0}"
+ is_partition_field_write_in_file=true
+ file_name_expression="${transactionId}_${now}"
+ file_format="text"
+ sink_columns=["name","age"]
+ filename_time_format="yyyy.MM.dd"
+ is_enable_transaction=true
+}
+
+```
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index bf9e17a38..64b008491 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -118,3 +118,4 @@ seatunnel.sink.elasticsearch = connector-elasticsearch
seatunnel.source.IoTDB = connector-iotdb
seatunnel.sink.IoTDB = connector-iotdb
seatunnel.sink.Neo4j = connector-neo4j
+seatunnel.sink.FtpFile = connector-file-ftp
diff --git a/pom.xml b/pom.xml
index cf65b3073..6ee15ec06 100644
--- a/pom.xml
+++ b/pom.xml
@@ -170,6 +170,7 @@
<mongo-spark.version>2.2.0</mongo-spark.version>
<spark-redis.version>2.6.0</spark-redis.version>
<commons-lang3.version>3.4</commons-lang3.version>
+ <commons-net.version>3.6</commons-net.version>
<kudu.version>1.11.1</kudu.version>
<email.version>1.5.6</email.version>
<commons-collections4.version>4.4</commons-collections4.version>
@@ -962,7 +963,14 @@
<version>${awaitility.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ <version>${commons-net.version}</version>
+ </dependency>
</dependencies>
+
</dependencyManagement>
<dependencies>
@@ -995,6 +1003,7 @@
<groupId>org.junit.jupiter</groupId>
<artifactId>junit-jupiter-params</artifactId>
</dependency>
+
</dependencies>
<build>
diff --git a/seatunnel-connectors-v2-dist/pom.xml b/seatunnel-connectors-v2-dist/pom.xml
index 80be3a66c..9791af172 100644
--- a/seatunnel-connectors-v2-dist/pom.xml
+++ b/seatunnel-connectors-v2-dist/pom.xml
@@ -101,6 +101,11 @@
<artifactId>connector-file-local</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-ftp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-hudi</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 03fc5a56d..7927451c9 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -61,6 +61,11 @@
<artifactId>parquet-avro</artifactId>
</dependency>
+ <dependency>
+ <groupId>commons-net</groupId>
+ <artifactId>commons-net</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.orc</groupId>
<artifactId>orc-core</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index 28399d25e..3156c9796 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
public enum FileSystemType implements Serializable {
HDFS("HdfsFile"),
- LOCAL("LocalFile");
+ LOCAL("LocalFile"),
+ FTP("FtpFile");
private final String fileSystemPluginName;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
index 83e51d1bc..3c460101a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/FileSinkWriterWithTransaction.java
@@ -61,22 +61,22 @@ public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow, F
this.textFileSinkConfig = textFileSinkConfig;
Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.textFileSinkConfig.getFileFormat(),
- this.textFileSinkConfig.getFileNameExpression(),
- this.textFileSinkConfig.getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.textFileSinkConfig.getPartitionFieldList(),
- this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
- this.textFileSinkConfig.getPartitionDirExpression()),
- this.textFileSinkConfig.getSinkColumnsIndexInRow(),
- this.textFileSinkConfig.getTmpPath(),
- this.textFileSinkConfig.getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.textFileSinkConfig.getFieldDelimiter(),
- this.textFileSinkConfig.getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
if (!transactionStateFileWriter.isPresent()) {
throw new RuntimeException("A TransactionStateFileWriter is need");
@@ -100,22 +100,22 @@ public class FileSinkWriterWithTransaction implements SinkWriter<SeaTunnelRow, F
this.jobId = jobId;
Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.textFileSinkConfig.getFileFormat(),
- this.textFileSinkConfig.getFileNameExpression(),
- this.textFileSinkConfig.getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.textFileSinkConfig.getPartitionFieldList(),
- this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
- this.textFileSinkConfig.getPartitionDirExpression()),
- this.textFileSinkConfig.getSinkColumnsIndexInRow(),
- this.textFileSinkConfig.getTmpPath(),
- this.textFileSinkConfig.getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.textFileSinkConfig.getFieldDelimiter(),
- this.textFileSinkConfig.getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
if (!transactionStateFileWriter.isPresent()) {
throw new RuntimeException("A TransactionStateFileWriter is need");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
index 0bdad1afe..8661693e7 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/TransactionStateFileSinkWriter.java
@@ -61,22 +61,22 @@ public class TransactionStateFileSinkWriter implements SinkWriter<SeaTunnelRow,
this.textFileSinkConfig = textFileSinkConfig;
Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.textFileSinkConfig.getFileFormat(),
- this.textFileSinkConfig.getFileNameExpression(),
- this.textFileSinkConfig.getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.textFileSinkConfig.getPartitionFieldList(),
- this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
- this.textFileSinkConfig.getPartitionDirExpression()),
- this.textFileSinkConfig.getSinkColumnsIndexInRow(),
- this.textFileSinkConfig.getTmpPath(),
- this.textFileSinkConfig.getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.textFileSinkConfig.getFieldDelimiter(),
- this.textFileSinkConfig.getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
if (!transactionStateFileWriter.isPresent()) {
throw new RuntimeException("A TransactionStateFileWriter is need");
@@ -100,22 +100,22 @@ public class TransactionStateFileSinkWriter implements SinkWriter<SeaTunnelRow,
this.jobId = jobId;
Optional<TransactionStateFileWriter> transactionStateFileWriter = sinkFileSystemPlugin.getTransactionStateFileWriter(this.seaTunnelRowTypeInfo,
- new FileSinkTransactionFileNameGenerator(
- this.textFileSinkConfig.getFileFormat(),
- this.textFileSinkConfig.getFileNameExpression(),
- this.textFileSinkConfig.getFileNameTimeFormat()),
- new FileSinkPartitionDirNameGenerator(
- this.textFileSinkConfig.getPartitionFieldList(),
- this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
- this.textFileSinkConfig.getPartitionDirExpression()),
- this.textFileSinkConfig.getSinkColumnsIndexInRow(),
- this.textFileSinkConfig.getTmpPath(),
- this.textFileSinkConfig.getPath(),
- this.jobId,
- this.context.getIndexOfSubtask(),
- this.textFileSinkConfig.getFieldDelimiter(),
- this.textFileSinkConfig.getRowDelimiter(),
- sinkFileSystemPlugin.getFileSystem().get());
+ new FileSinkTransactionFileNameGenerator(
+ this.textFileSinkConfig.getFileFormat(),
+ this.textFileSinkConfig.getFileNameExpression(),
+ this.textFileSinkConfig.getFileNameTimeFormat()),
+ new FileSinkPartitionDirNameGenerator(
+ this.textFileSinkConfig.getPartitionFieldList(),
+ this.textFileSinkConfig.getPartitionFieldsIndexInRow(),
+ this.textFileSinkConfig.getPartitionDirExpression()),
+ this.textFileSinkConfig.getSinkColumnsIndexInRow(),
+ this.textFileSinkConfig.getTmpPath(),
+ this.textFileSinkConfig.getPath(),
+ this.jobId,
+ this.context.getIndexOfSubtask(),
+ this.textFileSinkConfig.getFieldDelimiter(),
+ this.textFileSinkConfig.getRowDelimiter(),
+ sinkFileSystemPlugin.getFileSystem().get());
if (!transactionStateFileWriter.isPresent()) {
throw new RuntimeException("A TransactionStateFileWriter is need");
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
index 58c1ba157..fd56b1a50 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSystemType.java
@@ -21,7 +21,8 @@ import java.io.Serializable;
public enum FileSystemType implements Serializable {
HDFS("HdfsFile"),
- LOCAL("LocalFile");
+ LOCAL("LocalFile"),
+ FTP("FtpFile");
private String sinkFileSystemPluginName;
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
similarity index 74%
copy from seatunnel-connectors-v2/connector-file/pom.xml
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
index b84010b8a..4fcba546f 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/pom.xml
@@ -21,17 +21,20 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <artifactId>seatunnel-connectors-v2</artifactId>
- <groupId>org.apache.seatunnel</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
<artifactId>connector-file</artifactId>
- <packaging>pom</packaging>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+
+ <modelVersion>4.0.0</modelVersion>
+ <artifactId>connector-file-ftp</artifactId>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
- <modules>
- <module>connector-file-base</module>
- <module>connector-file-hadoop</module>
- <module>connector-file-local</module>
- </modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java
new file mode 100644
index 000000000..77458eff4
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSink.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.AbstractFileSink;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.config.FtpConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class FtpFileSink extends AbstractFileSink {
+
+ private String ftpHost;
+ private Integer ftpPort;
+ private String ftpUserName;
+ private String ftpPwd;
+
+ @Override
+ public SinkFileSystemPlugin getSinkFileSystemPlugin() {
+ return new FtpFileSinkPlugin();
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ super.prepare(pluginConfig);
+ CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+ FtpConfig.FTP_HOST, FtpConfig.FTP_PORT, FtpConfig.FTP_USERNAME, FtpConfig.FTP_PASSWORD);
+ if (!result.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+ } else {
+ this.ftpHost = pluginConfig.getString(FtpConfig.FTP_HOST);
+ this.ftpPort = pluginConfig.getInt(FtpConfig.FTP_PORT);
+ this.ftpUserName = pluginConfig.getString(FtpConfig.FTP_USERNAME);
+ this.ftpPwd = pluginConfig.getString(FtpConfig.FTP_PASSWORD);
+ FtpFileUtils.initFTPClient(this.ftpHost, this.ftpPort, this.ftpUserName, this.ftpPwd);
+
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java
new file mode 100644
index 000000000..6689b2f40
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/FtpFileSinkPlugin.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem.FtpFileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem.FtpFileSystemCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer.FtpTransactionStateFileWriteFactory;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.SinkFileSystemPlugin;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+import java.util.Optional;
+
+public class FtpFileSinkPlugin implements SinkFileSystemPlugin {
+
+ @Override
+ public String getPluginName() {
+ return FileSystemType.FTP.getSinkFileSystemPluginName();
+ }
+
+ @Override
+ public Optional<TransactionStateFileWriter> getTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String fieldDelimiter,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem) {
+ return Optional.of(FtpTransactionStateFileWriteFactory.of(seaTunnelRowTypeInfo,
+ transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow,
+ tmpPath, targetPath, jobId, subTaskIndex,
+ fieldDelimiter, rowDelimiter, fileSystem));
+ }
+
+ @Override
+ public Optional<FileSystemCommitter> getFileSystemCommitter() {
+ return Optional.of(new FtpFileSystemCommitter());
+ }
+
+ @Override
+ public Optional<FileSystem> getFileSystem() {
+ return Optional.of(new FtpFileSystem());
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
similarity index 64%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
index 28399d25e..0bcce0f45 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/config/FtpConfig.java
@@ -15,21 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.config;
-import java.io.Serializable;
-
-public enum FileSystemType implements Serializable {
- HDFS("HdfsFile"),
- LOCAL("LocalFile");
-
- private final String fileSystemPluginName;
-
- FileSystemType(String fileSystemPluginName) {
- this.fileSystemPluginName = fileSystemPluginName;
- }
-
- public String getFileSystemPluginName() {
- return fileSystemPluginName;
- }
+public class FtpConfig {
+ public static final String FTP_PASSWORD = "ftp_password";
+ public static final String FTP_USERNAME = "ftp_username";
+ public static final String FTP_HOST = "ftp_host";
+ public static final String FTP_PORT = "ftp_port";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
similarity index 50%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
index 28399d25e..83de31efb 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystem.java
@@ -15,21 +15,27 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem;
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
-public enum FileSystemType implements Serializable {
- HDFS("HdfsFile"),
- LOCAL("LocalFile");
+import org.apache.commons.net.ftp.FTPClient;
- private final String fileSystemPluginName;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Collectors;
- FileSystemType(String fileSystemPluginName) {
- this.fileSystemPluginName = fileSystemPluginName;
+public class FtpFileSystem implements FileSystem {
+ @Override
+ public void deleteFile(String path) throws IOException {
+ FtpFileUtils.deleteFile(path);
}
- public String getFileSystemPluginName() {
- return fileSystemPluginName;
+ @Override
+ public List<String> dirList(String dirPath) throws IOException {
+ FTPClient ftpClient = FtpFileUtils.getFTPClient();
+ return Arrays.stream(ftpClient.listFiles(dirPath)).map(dir -> dir.getName()).collect(Collectors.toList());
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java
new file mode 100644
index 000000000..37154de6f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/filesystem/FtpFileSystemCommitter.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.filesystem;
+
+import org.apache.seatunnel.connectors.seatunnel.file.sink.FileAggregatedCommitInfo;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystemCommitter;
+
+import lombok.NonNull;
+
+import java.io.IOException;
+import java.util.Map;
+
+public class FtpFileSystemCommitter implements FileSystemCommitter {
+ @Override
+ public void commitTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
+ for (Map.Entry<String, Map<String, String>> entry :
+ aggregateCommitInfo.getTransactionMap().entrySet()) {
+ for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
+ String key = mvFileEntry.getKey();
+ String value = mvFileEntry.getValue();
+ FtpFileUtils.renameFile(key, value);
+ }
+ FtpFileUtils.deleteFiles(entry.getKey());
+ }
+ }
+
+ @Override
+ public void abortTransaction(@NonNull FileAggregatedCommitInfo aggregateCommitInfo) throws IOException {
+ for (Map.Entry<String, Map<String, String>> entry :
+ aggregateCommitInfo.getTransactionMap().entrySet()) {
+ for (Map.Entry<String, String> mvFileEntry : entry.getValue().entrySet()) {
+ String oldFile = mvFileEntry.getKey();
+ String newFile = mvFileEntry.getValue();
+ if (FtpFileUtils.fileExist(newFile) && !FtpFileUtils.fileExist(oldFile)) {
+ FtpFileUtils.renameFile(mvFileEntry.getValue(), mvFileEntry.getKey());
+ }
+ }
+ FtpFileUtils.deleteFiles(entry.getKey());
+ }
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java
new file mode 100644
index 000000000..cefa023fb
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/util/FtpFileUtils.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util;
+
+import lombok.NonNull;
+import org.apache.commons.net.ftp.FTP;
+import org.apache.commons.net.ftp.FTPClient;
+import org.apache.commons.net.ftp.FTPFile;
+import org.apache.commons.net.ftp.FTPReply;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.net.SocketException;
+import java.util.StringTokenizer;
+
+public class FtpFileUtils {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FtpFileUtils.class);
+ private static volatile FTPClient FTPCLIENT;
+ private static String FTP_PASSWORD;
+ private static String FTP_USERNAME;
+ private static String FTP_HOST;
+ private static Integer FTP_PORT;
+ private static final int FTP_CONNECT_MAX_TIMEOUT = 30000;
+
+ public static FTPClient getFTPClient(){
+ return initFTPClient(FTP_HOST, FTP_PORT, FTP_USERNAME, FTP_PASSWORD);
+ }
+
+ public static FTPClient initFTPClient(@NonNull String ftpHost,
+ @NonNull int ftpPort,
+ @NonNull String ftpUserName,
+ @NonNull String ftpPassword) {
+ if (null == FTPCLIENT) {
+ synchronized (FtpFileUtils.class) {
+ if (null == FTPCLIENT) {
+ FtpFileUtils.FTP_HOST = ftpHost;
+ FtpFileUtils.FTP_PORT = ftpPort;
+ FtpFileUtils.FTP_USERNAME = ftpUserName;
+ FtpFileUtils.FTP_PASSWORD = ftpPassword;
+ try {
+ FTPCLIENT = new FTPClient();
+ FTPCLIENT.connect(ftpHost, ftpPort);
+ FTPCLIENT.login(ftpUserName, ftpPassword);
+ FTPCLIENT.setConnectTimeout(FTP_CONNECT_MAX_TIMEOUT);
+ FTPCLIENT.setControlEncoding("utf-8");
+ FTPCLIENT.enterLocalPassiveMode();
+ FTPCLIENT.setFileType(FTP.BINARY_FILE_TYPE);
+ if (!FTPReply.isPositiveCompletion(FTPCLIENT.getReplyCode())) {
+ FTPCLIENT.disconnect();
+ }
+ } catch (SocketException e) {
+ e.printStackTrace();
+ } catch (IOException e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+ return FTPCLIENT;
+ }
+
+ public static OutputStream getOutputStream(@NonNull String outFilePath) throws IOException {
+ FTPClient ftpClient = FtpFileUtils.getFTPClient();
+ if (!fileExist(outFilePath)) {
+ createFile(outFilePath);
+ }
+ OutputStream outputStream = ftpClient.appendFileStream(new String(outFilePath.getBytes("UTF-8"), "iso-8859-1"));
+ return outputStream;
+ }
+
+ public static boolean createDir(@NonNull String dirPath) throws IOException {
+ FTPClient ftpClient = getFTPClient();
+ StringTokenizer s = new StringTokenizer(dirPath, "/"); // sign
+ s.countTokens();
+ String pathName = "";
+ while (s.hasMoreElements()) {
+ pathName = pathName + "/" + s.nextElement();
+ try {
+ ftpClient.makeDirectory(pathName);
+ } catch (Exception e) {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ public static boolean createFile(@NonNull String filePath) throws IOException {
+ FTPClient ftpClient = getFTPClient();
+ StringTokenizer s = new StringTokenizer(filePath, "/"); // sign
+ int c = s.countTokens();
+ String dirName = "";
+ for (int i = 0; i < c - 1; i++) {
+ dirName = dirName + "/" + s.nextElement();
+ }
+ createDir(dirName);
+ byte[] b = "".getBytes();
+ InputStream inputStream = new ByteArrayInputStream(b);
+ ftpClient.storeFile(filePath, inputStream);
+ return true;
+ }
+
+ public static boolean fileExist(@NonNull String filePath) throws IOException {
+ FTPClient ftpClient = getFTPClient();
+ String[] listNames = ftpClient.listNames(filePath);
+ if (listNames != null) {
+ return listNames.length > 0;
+ }
+ else {
+ return false;
+ }
+ }
+
+ public static void renameFile(@NonNull String oldName, @NonNull String newName) throws IOException {
+ LOGGER.info("begin rename file oldName :[" + oldName + "] to newName :[" + newName + "]");
+ FTPClient ftpClient = getFTPClient();
+ if (!FtpFileUtils.fileExist(newName)) {
+ FtpFileUtils.createFile(newName);
+ } else {
+ FtpFileUtils.deleteFile(newName);
+ }
+ if (ftpClient.rename(oldName, newName)) {
+ LOGGER.info("rename file :[" + oldName + "] to [" + newName + "] finish");
+ } else {
+ throw new IOException("rename file :[" + oldName + "] to [" + newName + "] error");
+ }
+ }
+
+ public static void deleteFile(@NonNull String filePath) throws IOException {
+ FTPClient ftpClient = getFTPClient();
+ ftpClient.deleteFile(filePath);
+ }
+
+ public static boolean deleteFiles(@NonNull String pathName) {
+ FTPClient ftpClient = getFTPClient();
+ try {
+ FTPFile[] ftpFiles = ftpClient.listFiles(pathName);
+ if (null != ftpFiles && ftpFiles.length > 0) {
+ for (int i = 0; i < ftpFiles.length; i++) {
+ FTPFile thisFile = ftpFiles[i];
+ if (thisFile.isDirectory()) {
+ if (deleteFiles(pathName + "/" + thisFile.getName())){
+ ftpClient.removeDirectory(pathName);
+ } else {
+ return false;
+ }
+ } else {
+ if (!ftpClient.deleteFile(pathName)) {
+ return false;
+ }
+ }
+ }
+ }
+ ftpClient.removeDirectory(pathName);
+ } catch (Exception e) {
+ LOGGER.error("delete file [" + pathName + "] error");
+ return false;
+ }
+ return true;
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java
new file mode 100644
index 000000000..bfd130cb7
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTransactionStateFileWriteFactory.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.FileSinkTransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+
+import java.util.List;
+
+public class FtpTransactionStateFileWriteFactory {
+
+ private FtpTransactionStateFileWriteFactory() {}
+
+ public static TransactionStateFileWriter of(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String fieldDelimiter,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem
+ ) {
+ FileSinkTransactionFileNameGenerator fileSinkTransactionFileNameGenerator = (FileSinkTransactionFileNameGenerator) transactionFileNameGenerator;
+ FileFormat fileFormat = fileSinkTransactionFileNameGenerator.getFileFormat();
+ if (fileFormat.equals(FileFormat.CSV)) {
+ return new FtpTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+ // if file type not supported by file connector, default txt writer will be generated
+ return new FtpTxtTransactionStateFileWriter(
+ seaTunnelRowTypeInfo,
+ transactionFileNameGenerator,
+ partitionDirNameGenerator,
+ sinkColumnsIndexInRow,
+ tmpPath,
+ targetPath,
+ jobId,
+ subTaskIndex,
+ fieldDelimiter,
+ rowDelimiter,
+ fileSystem);
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java
new file mode 100644
index 000000000..b2d1786c8
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/ftp/writer/FtpTxtTransactionStateFileWriter.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.writer;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.ftp.util.FtpFileUtils;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.spi.FileSystem;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.transaction.TransactionFileNameGenerator;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.AbstractTransactionStateFileWriter;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.writer.PartitionDirNameGenerator;
+
+import lombok.NonNull;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class FtpTxtTransactionStateFileWriter extends AbstractTransactionStateFileWriter {
+ private static final Logger LOGGER = LoggerFactory.getLogger(FtpTxtTransactionStateFileWriter.class);
+
+ private Map<String, OutputStream> beingWrittenOutputStream;
+ private String fieldDelimiter;
+ private String rowDelimiter;
+
+ public FtpTxtTransactionStateFileWriter(@NonNull SeaTunnelRowType seaTunnelRowTypeInfo,
+ @NonNull TransactionFileNameGenerator transactionFileNameGenerator,
+ @NonNull PartitionDirNameGenerator partitionDirNameGenerator,
+ @NonNull List<Integer> sinkColumnsIndexInRow,
+ @NonNull String tmpPath,
+ @NonNull String targetPath,
+ @NonNull String jobId,
+ int subTaskIndex,
+ @NonNull String fieldDelimiter,
+ @NonNull String rowDelimiter,
+ @NonNull FileSystem fileSystem
+ ) {
+ super(seaTunnelRowTypeInfo, transactionFileNameGenerator, partitionDirNameGenerator, sinkColumnsIndexInRow, tmpPath, targetPath, jobId, subTaskIndex, fileSystem);
+ this.fieldDelimiter = fieldDelimiter;
+ this.rowDelimiter = rowDelimiter;
+ }
+
+ @Override
+ public void beginTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void abortTransaction(String transactionId) {
+ this.beingWrittenOutputStream = new HashMap<>();
+ }
+
+ @Override
+ public void write(@NonNull SeaTunnelRow seaTunnelRow) {
+ String filePath = getOrCreateFilePathBeingWritten(seaTunnelRow);
+ OutputStream outputStream = getOrCreateOutputStream(filePath);
+ String line = transformRowToLine(seaTunnelRow);
+ try {
+ outputStream.write(line.getBytes());
+ outputStream.write(rowDelimiter.getBytes());
+ } catch (IOException e) {
+ LOGGER.error("write data to file {} error", filePath);
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void finishAndCloseWriteFile() {
+ beingWrittenOutputStream.entrySet().forEach(entry -> {
+ try {
+ entry.getValue().flush();
+ } catch (IOException e) {
+ LOGGER.error("error when flush file {}", entry.getKey());
+ throw new RuntimeException(e);
+ } finally {
+ try {
+ entry.getValue().close();
+ } catch (IOException e) {
+ LOGGER.error("error when close output stream {}", entry.getKey());
+ }
+ }
+ needMoveFiles.put(entry.getKey(), getTargetLocation(entry.getKey()));
+ });
+ }
+
+ private OutputStream getOrCreateOutputStream(@NonNull String filePath) {
+ OutputStream outputStream = beingWrittenOutputStream.get(filePath);
+ if (outputStream == null) {
+ try {
+ outputStream = FtpFileUtils.getOutputStream(filePath);
+ beingWrittenOutputStream.put(filePath, outputStream);
+ } catch (IOException e) {
+ LOGGER.error("can not get output file stream");
+ throw new RuntimeException(e);
+ }
+ }
+ return outputStream;
+ }
+
+ private String transformRowToLine(@NonNull SeaTunnelRow seaTunnelRow) {
+ return this.sinkColumnsIndexInRow.stream().map(index ->
+ seaTunnelRow.getFields()[index] == null ? "" : seaTunnelRow.getFields()[index].toString())
+ .collect(Collectors.joining(fieldDelimiter));
+ }
+}
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index b84010b8a..951ccdc33 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -33,5 +33,6 @@
<module>connector-file-base</module>
<module>connector-file-hadoop</module>
<module>connector-file-local</module>
+ <module>connector-file-ftp</module>
</modules>
</project>
\ No newline at end of file
diff --git a/seatunnel-dist/release-docs/LICENSE b/seatunnel-dist/release-docs/LICENSE
index bb1745d18..d717ec093 100644
--- a/seatunnel-dist/release-docs/LICENSE
+++ b/seatunnel-dist/release-docs/LICENSE
@@ -357,7 +357,7 @@ The text of each license is the standard Apache 2.0 license.
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.5 - http://commons.apache.org/proper/commons-lang/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.6 - http://commons.apache.org/proper/commons-lang/)
(Apache License, Version 2.0) Apache Commons Lang (org.apache.commons:commons-lang3:3.8.1 - http://commons.apache.org/proper/commons-lang/)
- (Apache License, Version 2.0) Apache Commons Net (commons-net:commons-net:3.6 - http://commons.apache.org/proper/commons-net/)
+ (Apache License, Version 2.0) Apache Commons Net (org.apache.commons-net:commons-net:3.6 - http://commons.apache.org/proper/commons-net/)
(Apache License, Version 2.0) Apache Commons Text (org.apache.commons:commons-text:1.3 - http://commons.apache.org/proper/commons-text/)
(Apache License, Version 2.0) Apache HBase - Annotations (org.apache.hbase:hbase-annotations:2.0.0 - http://hbase.apache.org/hbase-annotations)
(Apache License, Version 2.0) Apache HBase - Client (org.apache.hbase:hbase-client:2.0.0 - http://hbase.apache.org/hbase-build-configuration/hbase-client)
@@ -688,7 +688,7 @@ The text of each license is the standard Apache 2.0 license.
(The Apache Software License, Version 2.0) Commons IO (commons-io:commons-io:2.4 - http://commons.apache.org/io/)
(The Apache Software License, Version 2.0) Commons Lang (commons-lang:commons-lang:2.6 - http://commons.apache.org/lang/)
(The Apache Software License, Version 2.0) Commons Math (org.apache.commons:commons-math3:3.1.1 - http://commons.apache.org/math/)
- (The Apache Software License, Version 2.0) Commons Net (commons-net:commons-net:3.1 - http://commons.apache.org/net/)
+ (The Apache Software License, Version 2.0) Commons Net (commons-net:commons-net:3.6 - http://commons.apache.org/net/)
(The Apache Software License, Version 2.0) Commons Pool (commons-pool:commons-pool:1.5.4 - http://commons.apache.org/pool/)
(The Apache Software License, Version 2.0) Commons Pool (commons-pool:commons-pool:1.6 - http://commons.apache.org/pool/)
(The Apache Software License, Version 2.0) Converter: Moshi (com.squareup.retrofit2:converter-moshi:2.9.0 - https://github.com/square/retrofit)
diff --git a/seatunnel-dist/release-docs/NOTICE b/seatunnel-dist/release-docs/NOTICE
index c6ae39191..ff8985575 100644
--- a/seatunnel-dist/release-docs/NOTICE
+++ b/seatunnel-dist/release-docs/NOTICE
@@ -356,6 +356,14 @@ framework, which can be obtained at:
* HOMEPAGE:
* http://commons.apache.org/logging/
+This product optionally depends on 'Apache Commons Net', a net
+framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-net.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * http://commons.apache.org/commons-net/
+
This product optionally depends on 'Apache Log4J', a logging framework, which
can be obtained at:
diff --git a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
index 28756ccbf..d9eb08874 100644
--- a/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-connector-v2-example/pom.xml
@@ -61,6 +61,11 @@
<artifactId>connector-file-local</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-file-ftp</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-socket</artifactId>
diff --git a/tools/dependencies/known-dependencies.txt b/tools/dependencies/known-dependencies.txt
index 172b11d80..dad98d576 100755
--- a/tools/dependencies/known-dependencies.txt
+++ b/tools/dependencies/known-dependencies.txt
@@ -89,7 +89,6 @@ commons-logging-1.2.jar
commons-math3-3.1.1.jar
commons-math3-3.4.1.jar
commons-math3-3.5.jar
-commons-net-3.1.jar
commons-net-3.6.jar
commons-pool-1.6.jar
commons-pool2-2.0.jar