You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/30 15:22:59 UTC

[incubator-seatunnel] branch dev updated: [feature][e2e][file] Add local file connector e2e (#3221)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 185621ae9 [feature][e2e][file] Add local file connector e2e (#3221)
185621ae9 is described below

commit 185621ae9314971a9f55f7b36444b115b88c2570
Author: Tyrantlucifer <Ty...@gmail.com>
AuthorDate: Sun Oct 30 23:22:54 2022 +0800

    [feature][e2e][file] Add local file connector e2e (#3221)
    
    * [Feature][Connector-V2-e2e][LocalFile] Add local file connector e2e
    
    * [Improve][e2e][Flink] Execute extra commands on task manager container
    
    * [Feature][Connector-V2-e2e][LocalFile] Optimize test cases
---
 .../{ => connector-file-local-e2e}/pom.xml         |  28 +----
 .../e2e/connector/file/local/LocalFileIT.java      |  78 ++++++++++++
 .../src/test/resources/json/e2e.json               |   5 +
 .../resources/json/fake_to_local_file_json.conf    |  82 +++++++++++++
 .../resources/json/local_file_json_to_assert.conf  | 131 +++++++++++++++++++++
 .../src/test/resources/log4j.properties            |  22 ++++
 .../src/test/resources/orc/e2e.orc                 | Bin 0 -> 5471 bytes
 .../test/resources/orc/fake_to_local_file_orc.conf |  82 +++++++++++++
 .../resources/orc/local_file_orc_to_assert.conf    |  97 +++++++++++++++
 .../src/test/resources/parquet/e2e.parquet         | Bin 0 -> 9730 bytes
 .../parquet/fake_to_local_file_parquet.conf        |  82 +++++++++++++
 .../parquet/local_file_parquet_to_assert.conf      |  97 +++++++++++++++
 .../src/test/resources/text/e2e.txt                |   5 +
 .../resources/text/fake_to_local_file_text.conf    |  82 +++++++++++++
 .../resources/text/local_file_text_to_assert.conf  | 131 +++++++++++++++++++++
 seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml   |   1 +
 16 files changed, 898 insertions(+), 25 deletions(-)

diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml
similarity index 58%
copy from seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
copy to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml
index d5404c565..db73ba531 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/pom.xml
@@ -17,41 +17,19 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
     <parent>
-        <artifactId>seatunnel-e2e</artifactId>
+        <artifactId>seatunnel-connector-v2-e2e</artifactId>
         <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
-    <packaging>pom</packaging>
-    <modules>
-        <module>connector-assert-e2e</module>
-        <module>connector-jdbc-e2e</module>
-        <module>connector-redis-e2e</module>
-        <module>connector-clickhouse-e2e</module>
-        <module>connector-influxdb-e2e</module>
-    </modules>
 
-    <artifactId>seatunnel-connector-v2-e2e</artifactId>
+    <artifactId>connector-file-local-e2e</artifactId>
 
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-e2e-common</artifactId>
+            <artifactId>connector-file-local</artifactId>
             <version>${project.version}</version>
-            <type>test-jar</type>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-flink-starter</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>seatunnel-spark-starter</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
         </dependency>
     </dependencies>
 
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
new file mode 100644
index 000000000..970810484
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.file.local;
+
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.nio.file.Path;
+
+public class LocalFileIT extends TestSuiteBase {
+
+    /**
+     * Copy data files to container
+     */
+    @TestContainerExtension
+    private final ContainerExtendedFactory extendedFactory = container -> {
+        Path jsonPath = ContainerUtil.getResourcesFile("/json/e2e.json").toPath();
+        Path orcPath = ContainerUtil.getResourcesFile("/orc/e2e.orc").toPath();
+        Path parquetPath = ContainerUtil.getResourcesFile("/parquet/e2e.parquet").toPath();
+        Path textPath = ContainerUtil.getResourcesFile("/text/e2e.txt").toPath();
+        container.copyFileToContainer(MountableFile.forHostPath(jsonPath), "/seatunnel/read/json/name=tyrantlucifer/hobby=coding/e2e.json");
+        container.copyFileToContainer(MountableFile.forHostPath(orcPath), "/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc");
+        container.copyFileToContainer(MountableFile.forHostPath(parquetPath), "/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet");
+        container.copyFileToContainer(MountableFile.forHostPath(textPath), "/seatunnel/read/text/name=tyrantlucifer/hobby=coding/e2e.txt");
+    };
+
+    @TestTemplate
+    public void testLocalFileReadAndWrite(TestContainer container) throws IOException, InterruptedException {
+        // test write local text file
+        Container.ExecResult textWriteResult = container.executeJob("/text/fake_to_local_file_text.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        // test read local text file
+        Container.ExecResult textReadResult = container.executeJob("/text/local_file_text_to_assert.conf");
+        Assertions.assertEquals(0, textReadResult.getExitCode());
+        // test write local json file
+        Container.ExecResult jsonWriteResult = container.executeJob("/json/fake_to_local_file_json.conf");
+        Assertions.assertEquals(0, jsonWriteResult.getExitCode());
+        // test read local json file
+        Container.ExecResult jsonReadResult = container.executeJob("/json/local_file_json_to_assert.conf");
+        Assertions.assertEquals(0, jsonReadResult.getExitCode());
+        // test write local orc file
+        Container.ExecResult orcWriteResult = container.executeJob("/orc/fake_to_local_file_orc.conf");
+        Assertions.assertEquals(0, orcWriteResult.getExitCode());
+        // test read local orc file
+        Container.ExecResult orcReadResult = container.executeJob("/orc/local_file_orc_to_assert.conf");
+        Assertions.assertEquals(0, orcReadResult.getExitCode());
+        // test write local parquet file
+        Container.ExecResult parquetWriteResult = container.executeJob("/parquet/fake_to_local_file_parquet.conf");
+        Assertions.assertEquals(0, parquetWriteResult.getExitCode());
+        // test read local parquet file
+        Container.ExecResult parquetReadResult = container.executeJob("/parquet/local_file_parquet_to_assert.conf");
+        Assertions.assertEquals(0, parquetReadResult.getExitCode());
+    }
+}
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e.json b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e.json
new file mode 100644
index 000000000..aff56314e
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/e2e.json
@@ -0,0 +1,5 @@
+{"c_map":{"ccQcS":"PrhhP","ypJZu":"MsOdX","YFBJW":"iPXGR","ipjwT":"kcgPQ","EpKKR":"jgRfX"},"c_array":[887776100,1633238485,1009033208,600614572,1487972145],"c_string":"WArEB","c_boolean":false,"c_tinyint":-90,"c_smallint":15920,"c_int":1127427935,"c_bigint":4712806879122100224,"c_float":1.620476E38,"c_double":2.750908810407852E307,"c_bytes":"Q3NrVnQ=","c_date":"2022-04-27","c_decimal":88574263949141714798.835853182708550244,"c_timestamp":"2022-01-26T17:39:00","c_row":{"C_MAP":{"IVaKD":"b [...]
+{"c_map":{"AKiQx":"wIIdk","zgunZ":"qvHRy","ohVQL":"WfBPo","EzUcN":"yPhVF","qusBc":"FWbcI"},"c_array":[1837821269,980724530,2085935679,386596035,1433416218],"c_string":"LGMAw","c_boolean":false,"c_tinyint":-65,"c_smallint":25802,"c_int":1312064317,"c_bigint":4434124023629949952,"c_float":1.0186125E38,"c_double":3.0746920457833206E307,"c_bytes":"V2pjem4=","c_date":"2022-04-21","c_decimal":1943815605574160687.499688237951975681,"c_timestamp":"2022-08-09T09:32:00","c_row":{"C_MAP":{"qMdUz":" [...]
+{"c_map":{"VLlqs":"OwUpp","MWXek":"KDEYD","RAZII":"zGJSJ","wjBNl":"IPTvu","YkGPS":"ORquf"},"c_array":[1530393427,2055877022,1389865473,926021483,402841214],"c_string":"TNcNF","c_boolean":false,"c_tinyint":-93,"c_smallint":26429,"c_int":1890712921,"c_bigint":78884499049828080,"c_float":7.816842E37,"c_double":7.852574522011583E307,"c_bytes":"cHhzZVA=","c_date":"2022-06-05","c_decimal":32486229951636021942.906126821535443395,"c_timestamp":"2022-04-09T16:03:00","c_row":{"C_MAP":{"yIfRN":"gTB [...]
+{"c_map":{"OSHIu":"FlSum","MaSwp":"KYQkK","iXmjf":"zlkgq","jOBeN":"RDfwI","mNmag":"QyxeW"},"c_array":[1632475346,1988402914,1222138765,1952120146,1223582179],"c_string":"fUmcz","c_boolean":false,"c_tinyint":86,"c_smallint":2122,"c_int":798530029,"c_bigint":4622710207120546816,"c_float":2.7438526E38,"c_double":3.710018378162975E306,"c_bytes":"WWlCdWk=","c_date":"2022-10-08","c_decimal":21195432655142738238.345609599825344131,"c_timestamp":"2022-01-12T10:58:00","c_row":{"C_MAP":{"HdaHZ":"K [...]
+{"c_map":{"aDAzK":"sMIOi","NSyDX":"TKSoT","JLxhC":"NpeWZ","LAjup":"KmHDA","HUIPE":"yAOKq"},"c_array":[1046349188,1243865078,849372657,522012053,644827083],"c_string":"pwRSn","c_boolean":true,"c_tinyint":55,"c_smallint":14285,"c_int":290002708,"c_bigint":4717741595193431040,"c_float":3.0965473E38,"c_double":1.2984472295257766E308,"c_bytes":"TE1oUWg=","c_date":"2022-05-05","c_decimal":75406296065465000885.249652183329686608,"c_timestamp":"2022-07-05T14:40:00","c_row":{"C_MAP":{"WTqxL":"RuJ [...]
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
new file mode 100644
index 000000000..0dc2cc054
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/fake_to_local_file_json.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/json"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "json"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
new file mode 100644
index 000000000..901f90984
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/json/local_file_json_to_assert.conf
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/json"
+    type = "json"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/log4j.properties b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p %c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc
new file mode 100644
index 000000000..507ce3703
Binary files /dev/null and b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/e2e.orc differ
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
new file mode 100644
index 000000000..647b7a17a
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/fake_to_local_file_orc.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/orc"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "orc"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
new file mode 100644
index 000000000..eed920ab3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/parquet"
+    type = "parquet"
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/e2e.parquet b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/e2e.parquet
new file mode 100644
index 000000000..ddaad524d
Binary files /dev/null and b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/e2e.parquet differ
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
new file mode 100644
index 000000000..531dd8211
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/fake_to_local_file_parquet.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/parquet"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "parquet"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
new file mode 100644
index 000000000..eed920ab3
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/parquet/local_file_parquet_to_assert.conf
@@ -0,0 +1,97 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/parquet"
+    type = "parquet"
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
new file mode 100644
index 000000000..727d77044
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/e2e.txt
@@ -0,0 +1,5 @@
+{"Eayvq":"YVrWJ","kegtW":"jshQP","ShRel":"abmwR","vDjnr":"gnaVl","zGERi":"nlZlS"}[1208226851,1011688192,220827945,1047769453,1137173284]ajNqtfalse1203130814396025058012143372451870729.532721E371.566752199436781E308eqGaK2022-05-0984150670027331165724.3898288671923612152022-01-10 00:52:00{"YZCPo":"saHIr","NXuEq":"lRvfb","rpZUW":"TsaaM","fySFX":"jRTjq","kxmOB":"IYcoV"}[2014870053,823563564,783454480,651161159,704692328]mMPmRtrue221632040893809116054760376851640323.1975 [...]
+{"njZoW":"GMeYm","UcdNW":"QzeUB","KEUVX":"gBDFK","HYQAp":"wrkAU","CIuxE":"xbFow"}[2031111683,1178991118,2039039138,1613817066,110479538]NJdgitrue-89290489565971423719286406107975683.2706686E385.558510322527257E305TsBfe2022-11-0226360128806482056292.3973861503683968622022-07-19 01:55:00{"OYtro":"ujYpV","CmwWw":"scFCA","sLsBv":"OWHCB","pyEBC":"IBJaJ","dcnST":"EWNUE"}[1375044625,1699399583,1568825650,1259259717,1993279080]xGYNztrue-1061370319488882359202931209018536960 [...]
+{"RHdeD":"IfOaZ","qrTJC":"kyLmx","aangN":"lAeap","ZoLNq":"BUFPs","xgRbl":"wxinc"}[164178112,1126624441,15024852,1149751089,1521818730]YvrXsfalse232515032008388530651078388625039361.6630125E381.7843554887310442E308vpGjb2022-03-2178172562452015409780.5845612134798772192022-08-13 07:35:00{"tzgLt":"kDbVh","WQjRn":"qJbFw","CkgpP":"VAefn","ugYhg":"JsunV","IiEeb":"quvGf"}[1776928121,1957734724,524685509,561366753,938508932]mkHAitrue922741247279574969724970679742218241.7999 [...]
+{"jjfbQ":"YnIXv","yENje":"ffDvT","eMpYW":"LZvRw","oheFK":"QjTUE","OULct":"xrYhB"}[104016153,1372901715,1383968877,1615642149,195294861]vMstWfalse-7625594116342148311317935807474656001.792487E381.0428995536163822E308JSzTD2022-05-2455390102538242453238.6893233870974102472022-04-02 03:07:00{"YISEv":"PRNAA","iOZbP":"RuJAC","xUpOy":"KQrHA","KDQtr":"UZwht","KRfJK":"KZpCe"}[1746626910,1211890529,1600627099,866858633,1489006903]wfDHTfalse-126224418100044198439134512658444288 [...]
+{"Bdbdl":"oODJP","KqLlu":"vyGtf","jvGPe":"yRctE","rRyXw":"BSABQ","rHvsQ":"VCUqr"}[333070022,1078936427,1553422323,979329793,1601100765]Zbmuofalse1081556163411943031919830971434823681.1692632E381.0154981418689449E308IuDpI2022-06-1518027895538483530319.4903776087281118622022-06-23 00:40:00{"cqeow":"wXwCu","nFjrN":"zWila","lTedA":"WrGgl","NPLcF":"RXUhL","EIzgS":"MFnLE"}[814534518,1778618727,2107481454,1240038762,974703010]HtbvRtrue12423258182702762422924058106246453761 [...]
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
new file mode 100644
index 000000000..7bbe3cdb5
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/fake_to_local_file_text.conf
@@ -0,0 +1,82 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  FakeSource {
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  LocalFile {
+    path = "/tmp/seatunnel/text"
+    row_delimiter = "\n"
+    partition_dir_expression = "${k0}=${v0}"
+    is_partition_field_write_in_file = true
+    file_name_expression = "${transactionId}_${now}"
+    file_format = "text"
+    filename_time_format = "yyyy.MM.dd"
+    is_enable_transaction = true
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
new file mode 100644
index 000000000..0f16a37b9
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/text/local_file_text_to_assert.conf
@@ -0,0 +1,131 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  execution.parallelism = 1
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+  job.mode = "BATCH"
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/text"
+    type = "text"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        c_timestamp = timestamp
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          c_int = int
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_double
+          field_type = double
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = name
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = hobby
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index d5404c565..46bb9df8f 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -29,6 +29,7 @@
         <module>connector-redis-e2e</module>
         <module>connector-clickhouse-e2e</module>
         <module>connector-influxdb-e2e</module>
+        <module>connector-file-local-e2e</module>
     </modules>
 
     <artifactId>seatunnel-connector-v2-e2e</artifactId>