You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2023/04/28 05:44:56 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-e2e] add sftp e2e test (#4623)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2e8b28d15 [Feature][Connector-e2e] add sftp e2e test (#4623)
2e8b28d15 is described below

commit 2e8b28d1525ad7a50371409bb0701937ab4fe625
Author: monster <60...@users.noreply.github.com>
AuthorDate: Fri Apr 28 13:44:50 2023 +0800

    [Feature][Connector-e2e] add sftp e2e test (#4623)
    
    * [Feature][Connector-e2e] add sftp e2e test
---
 .../connector-file-sftp-e2e/pom.xml                |  46 +++++++
 .../e2e/connector/file/fstp/SftpFileIT.java        | 142 +++++++++++++++++++++
 .../src/test/resources/excel/e2e.xlsx              | Bin 0 -> 5823 bytes
 .../resources/excel/fakesource_to_sftp_excel.conf  |  89 +++++++++++++
 .../excel/sftp_excel_projection_to_assert.conf     | 115 +++++++++++++++++
 .../test/resources/excel/sftp_excel_to_assert.conf | 135 ++++++++++++++++++++
 .../src/test/resources/json/e2e.json               |   5 +
 .../resources/json/fake_to_sftp_file_json.conf     |  86 +++++++++++++
 .../resources/json/sftp_file_json_to_assert.conf   | 136 ++++++++++++++++++++
 .../src/test/resources/text/e2e.txt                |   5 +
 .../resources/text/fake_to_sftp_file_text.conf     |  87 +++++++++++++
 .../text/sftp_file_text_projection_to_assert.conf  | 137 ++++++++++++++++++++
 .../text/sftp_file_text_skip_headers.conf          | 137 ++++++++++++++++++++
 .../resources/text/sftp_file_text_to_assert.conf   | 136 ++++++++++++++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 15 files changed, 1257 insertions(+)

diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/pom.xml
new file mode 100644
index 000000000..8ff705171
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/pom.xml
@@ -0,0 +1,46 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.seatunnel</groupId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
+        <version>${revision}</version>
+    </parent>
+
+    <artifactId>connector-file-sftp-e2e</artifactId>
+    <name>SeaTunnel : E2E : Connector V2 : File Sftp</name>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-fake</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-sftp</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-assert</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
+
+</project>
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
new file mode 100644
index 000000000..82d1be73d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/fstp/SftpFileIT.java
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.fstp;
+
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.stream.Stream;
+
+@DisabledOnContainer(
+        value = {TestContainerId.SPARK_2_4},
+        disabledReason = "The apache-compress version is not compatible with apache-poi")
+@Slf4j
+public class SftpFileIT extends TestSuiteBase implements TestResource {
+
+    private static final String SFTP_IMAGE = "atmoz/sftp:latest";
+
+    private static final String SFTP_CONTAINER_HOST = "sftp";
+
+    private static final int SFTP_PORT = 22;
+
+    private static final int SFTP_BIND_PORT = 2222;
+
+    private static final String USERNAME = "seatunnel";
+
+    private static final String PASSWORD = "pass";
+
+    private GenericContainer<?> sftpContainer;
+
+    @BeforeAll
+    @Override
+    public void startUp() throws Exception {
+        sftpContainer =
+                new GenericContainer<>(SFTP_IMAGE)
+                        .withEnv("SFTP_USERS", USERNAME + ":" + PASSWORD)
+                        .withCommand(USERNAME + ":" + PASSWORD + ":::tmp")
+                        .withNetwork(NETWORK)
+                        .withNetworkAliases(SFTP_CONTAINER_HOST)
+                        .withExposedPorts(SFTP_PORT);
+
+        sftpContainer.setPortBindings(Collections.singletonList(SFTP_BIND_PORT + ":" + SFTP_PORT));
+        sftpContainer.start();
+        Startables.deepStart(Stream.of(sftpContainer)).join();
+        log.info("Sftp container started");
+        Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
+        Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
+        Path excelPath = ContainerUtil.getResourcesFile("/excel/e2e.xlsx").toPath();
+        sftpContainer.copyFileToContainer(
+                MountableFile.forHostPath(jsonPath),
+                "/home/seatunnel/tmp/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
+        sftpContainer.copyFileToContainer(
+                MountableFile.forHostPath(textPath),
+                "/home/seatunnel/tmp/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
+        sftpContainer.copyFileToContainer(
+                MountableFile.forHostPath(excelPath),
+                "/home/seatunnel/tmp/seatunnel/read/excel/name=tyrantlucifer/hobby=coding/e2e.xlsx");
+        sftpContainer.execInContainer("sh", "-c", "chown -R seatunnel /home/seatunnel/tmp/");
+    }
+
+    @TestTemplate
+    public void testSftpFileReadAndWrite(TestContainer container)
+            throws IOException, InterruptedException {
+        // test write sftp excel file
+        Container.ExecResult excelWriteResult =
+                container.executeJob("/excel/fakesource_to_sftp_excel.conf");
+        Assertions.assertEquals(0, excelWriteResult.getExitCode(), excelWriteResult.getStderr());
+        // test read sftp excel file
+        Container.ExecResult excelReadResult =
+                container.executeJob("/excel/sftp_excel_to_assert.conf");
+        Assertions.assertEquals(0, excelReadResult.getExitCode(), excelReadResult.getStderr());
+        // test read sftp excel file with projection
+        Container.ExecResult excelProjectionReadResult =
+                container.executeJob("/excel/sftp_excel_projection_to_assert.conf");
+        Assertions.assertEquals(
+                0, excelProjectionReadResult.getExitCode(), excelProjectionReadResult.getStderr());
+        // test write sftp text file
+        Container.ExecResult textWriteResult =
+                container.executeJob("/text/fake_to_sftp_file_text.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        // test read skip header
+        Container.ExecResult textWriteAndSkipResult =
+                container.executeJob("/text/sftp_file_text_skip_headers.conf");
+        Assertions.assertEquals(0, textWriteAndSkipResult.getExitCode());
+        // test read sftp text file
+        Container.ExecResult textReadResult =
+                container.executeJob("/text/sftp_file_text_to_assert.conf");
+        Assertions.assertEquals(0, textReadResult.getExitCode());
+        // test read sftp text file with projection
+        Container.ExecResult textProjectionResult =
+                container.executeJob("/text/sftp_file_text_projection_to_assert.conf");
+        Assertions.assertEquals(0, textProjectionResult.getExitCode());
+        // test write sftp json file
+        Container.ExecResult jsonWriteResult =
+                container.executeJob("/json/fake_to_sftp_file_json.conf");
+        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        // test read sftp json file
+        Container.ExecResult jsonReadResult =
+                container.executeJob("/json/sftp_file_json_to_assert.conf");
+        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+    }
+
+    @AfterAll
+    @Override
+    public void tearDown() {
+        if (sftpContainer != null) {
+            sftpContainer.close();
+        }
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/e2e.xlsx b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/e2e.xlsx
new file mode 100644
index 000000000..87d363d7d
Binary files /dev/null and b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/e2e.xlsx differ
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/fakesource_to_sftp_excel.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/fakesource_to_sftp_excel.conf
new file mode 100644
index 000000000..c6c7b2e13
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/fakesource_to_sftp_excel.conf
@@ -0,0 +1,89 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    result_table_name = "sftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "/tmp/seatunnel/excel"
+    source_table_name = "sftp"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "excel"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_projection_to_assert.conf
new file mode 100644
index 000000000..32e13206d
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_projection_to_assert.conf
@@ -0,0 +1,115 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/read/excel"
+    result_table_name = "sftp"
+    file_format_type = excel
+    delimiter = ;
+    read_columns = [c_string, c_boolean]
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_to_assert.conf
new file mode 100644
index 000000000..3b9cc0527
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/excel/sftp_excel_to_assert.conf
@@ -0,0 +1,135 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in seatunnel config
+######
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    path = "tmp/seatunnel/read/excel"
+    result_table_name = "sftp"
+    file_format_type = excel
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    delimiter = ";"
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
+
+
+
+
+
+
+
+
+
+
+
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/e2e.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/e2e.json
new file mode 100644
index 000000000..aff56314e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/e2e.json
@@ -0,0 +1,5 @@
+{"c_map":{"ccQcS":"PrhhP","ypJZu":"MsOdX","YFBJW":"iPXGR","ipjwT":"kcgPQ","EpKKR":"jgRfX"},"c_array":[887776100,1633238485,1009033208,600614572,1487972145],"c_string":"WArEB","c_boolean":false,"c_tinyint":-90,"c_smallint":15920,"c_int":1127427935,"c_bigint":4712806879122100224,"c_float":1.620476E38,"c_double":2.750908810407852E307,"c_bytes":"Q3NrVnQ=","c_date":"2022-04-27","c_decimal":88574263949141714798.835853182708550244,"c_timestamp":"2022-01-26T17:39:00","c_row":{"C_MAP":{"IVaKD":"b [...]
+{"c_map":{"AKiQx":"wIIdk","zgunZ":"qvHRy","ohVQL":"WfBPo","EzUcN":"yPhVF","qusBc":"FWbcI"},"c_array":[1837821269,980724530,2085935679,386596035,1433416218],"c_string":"LGMAw","c_boolean":false,"c_tinyint":-65,"c_smallint":25802,"c_int":1312064317,"c_bigint":4434124023629949952,"c_float":1.0186125E38,"c_double":3.0746920457833206E307,"c_bytes":"V2pjem4=","c_date":"2022-04-21","c_decimal":1943815605574160687.499688237951975681,"c_timestamp":"2022-08-09T09:32:00","c_row":{"C_MAP":{"qMdUz":" [...]
+{"c_map":{"VLlqs":"OwUpp","MWXek":"KDEYD","RAZII":"zGJSJ","wjBNl":"IPTvu","YkGPS":"ORquf"},"c_array":[1530393427,2055877022,1389865473,926021483,402841214],"c_string":"TNcNF","c_boolean":false,"c_tinyint":-93,"c_smallint":26429,"c_int":1890712921,"c_bigint":78884499049828080,"c_float":7.816842E37,"c_double":7.852574522011583E307,"c_bytes":"cHhzZVA=","c_date":"2022-06-05","c_decimal":32486229951636021942.906126821535443395,"c_timestamp":"2022-04-09T16:03:00","c_row":{"C_MAP":{"yIfRN":"gTB [...]
+{"c_map":{"OSHIu":"FlSum","MaSwp":"KYQkK","iXmjf":"zlkgq","jOBeN":"RDfwI","mNmag":"QyxeW"},"c_array":[1632475346,1988402914,1222138765,1952120146,1223582179],"c_string":"fUmcz","c_boolean":false,"c_tinyint":86,"c_smallint":2122,"c_int":798530029,"c_bigint":4622710207120546816,"c_float":2.7438526E38,"c_double":3.710018378162975E306,"c_bytes":"WWlCdWk=","c_date":"2022-10-08","c_decimal":21195432655142738238.345609599825344131,"c_timestamp":"2022-01-12T10:58:00","c_row":{"C_MAP":{"HdaHZ":"K [...]
+{"c_map":{"aDAzK":"sMIOi","NSyDX":"TKSoT","JLxhC":"NpeWZ","LAjup":"KmHDA","HUIPE":"yAOKq"},"c_array":[1046349188,1243865078,849372657,522012053,644827083],"c_string":"pwRSn","c_boolean":true,"c_tinyint":55,"c_smallint":14285,"c_int":290002708,"c_bigint":4717741595193431040,"c_float":3.0965473E38,"c_double":1.2984472295257766E308,"c_bytes":"TE1oUWg=","c_date":"2022-05-05","c_decimal":75406296065465000885.249652183329686608,"c_timestamp":"2022-07-05T14:40:00","c_row":{"C_MAP":{"WTqxL":"RuJ [...]
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/fake_to_sftp_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/fake_to_sftp_file_json.conf
new file mode 100644
index 000000000..9f9ae75b2
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/fake_to_sftp_file_json.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+source {
+  FakeSource {
+    result_table_name = "sftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/json"
+    source_table_name = "sftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "json"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert.conf
new file mode 100644
index 000000000..5c91f2fd6
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/json/sftp_file_json_to_assert.conf
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/read/json"
+    file_format_type = "json"
+    result_table_name = "sftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    result_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/e2e.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/e2e.txt
new file mode 100644
index 000000000..9871cd85e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/e2e.txt
@@ -0,0 +1,5 @@
+uDDrwsQQYONTNeUBIOnLAgunvDqLBObroRzdEdvDgRmgaeFyFH5456857591576298739157764687713794636442057612252MTDnafalse3313846190943192276641872220071936002.4798444E389.52375328387482E307vcIGF2023-06-0776258155390368615610.7646252373186602912023-05-08 16:08:51ipToEdierOAbwQfQzObWqiRhjkWYaMKdCbjurhstsWrAVlRyyR2905930362869031292782506910815576701385108050hArFutrue12631169122166306155952414159791708165.949173E372.1775762383875058E307kMlgO2023-05-20 [...]
+QIpzzZNFkLwARZDSdwdBzkegCdIRVYJnuXgxNXytAJxxaTzmDF16603816781145850255103997497062535321459349811xaTOktrue5327578191749099325840234439082792961.955231E381.5072154481920294E308GDWOu2023-05-0581449039533149712064.4515003874168475032023-07-06 22:34:11sfgxhqvOLzjdTSNcNaWfEnZqvQraSSuMPazCGhPmSrGuxggqGh111449466287130860562118177510004750271267350957FDhTstrue96247293946402921952995131535667203.3240283E384.473485404447698E307YFdwf2023-02-04294 [...]
+xVJPgVlosBlTYSkmJCqKHMXzbZkNQKInuVMZeYGhsmzUmcLyPx137745493211075991209783701051546835517166168384qcYaifalse8318050110096656524405690917018449922.9617934E371.8901064340036343E307jaKMq2023-05-1275317114043170470995.9654034735914367862023-05-18 08:09:22raGGBnHsNwMZKemkFErUbedNjSllNcKOVUGdTpXcHGSVphHsNE86377304018502081846122308810391870441519757437JCRZStrue1829974183977114228752256792969205767.9090967E371.6286963710372255E308NBHUB2023-05-0 [...]
+dBgFeTKkCfnxCljyGfNEurEzCVgwpsHgmcOfYXiQHxeeQNjQuq1961913761867016982512369059615238191571813320BTfhbfalse652666522281866957533025299230722.1456136E381.2398422714159417E308YOiwg2023-10-2433001899362876139955.7235198795513055732023-06-23 13:46:46jsvmHLHlXCGFKwuqlTwAjdMckElrmqgBWvOuuKuWxcinFZWSky19959088245502706421265289671411088181469730839vUyULtrue952655754382886132164227350822215681.9033253E381.0966562906060974E308XFeKf2023-09-1731084 [...]
+obtYzIHOTKsABVtirEKEMYUYobsYlDJcFbpQUYvGxCcKlnswEG8096984004544201585383739017658796661353001394xchcntrue853141253976762312923177914159380482.8480754E381.055208146200822E308MSkTD2023-11-2420361788179232141281.9718823433892185262023-10-25 11:47:50gdCWZMGESyarjQPopBhDwKnOyDvaUDgQOEDRCmfUAagfnDDPqV8473436731118772451890654127233667151574025969ewJzLtrue6321769209768782446484076920790579202.7134378E381.1883616449174808E308STvOu2023-10-082179 [...]
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/fake_to_sftp_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/fake_to_sftp_file_text.conf
new file mode 100644
index 000000000..e17ff90af
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/fake_to_sftp_file_text.conf
@@ -0,0 +1,87 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+source {
+  FakeSource {
+    result_table_name = "sftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/text"
+    source_table_name = "sftp"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format_type = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+    compress_codec = "lzo"
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_projection_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_projection_to_assert.conf
new file mode 100644
index 000000000..4df33b6b5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_projection_to_assert.conf
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/read/text"
+    file_format_type = "text"
+    result_table_name = "sftp"
+    read_columns = [c_string, c_boolean, c_double]
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_skip_headers.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_skip_headers.conf
new file mode 100644
index 000000000..2039700a7
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_skip_headers.conf
@@ -0,0 +1,137 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/read/text"
+    result_table_name = "sftp"
+    file_format_type = "text"
+    skip_header_row_number = 1
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 4
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_to_assert.conf
new file mode 100644
index 000000000..e122de454
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-sftp-e2e/src/test/resources/text/sftp_file_text_to_assert.conf
@@ -0,0 +1,136 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  # You can set flink configuration here
+  execution.parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 1
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  SftpFile {
+    host = "sftp"
+    port = 22
+    user = seatunnel
+    password = pass
+    path = "tmp/seatunnel/read/text"
+    file_format_type = "text"
+    result_table_name = "sftp"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    source_table_name = "sftp"
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index df06e85ea..17a180aac 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -36,6 +36,7 @@
         <module>connector-influxdb-e2e</module>
         <module>connector-amazondynamodb-e2e</module>
         <module>connector-file-local-e2e</module>
+        <module>connector-file-sftp-e2e</module>
         <module>connector-cassandra-e2e</module>
         <module>connector-neo4j-e2e</module>
         <module>connector-http-e2e</module>