You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ga...@apache.org on 2022/10/20 11:25:31 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2][S3] Add S3 file source & sink connector (#3119)

This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new f27d68ca9 [Feature][Connector-V2][S3] Add S3 file source & sink connector (#3119)
f27d68ca9 is described below

commit f27d68ca9cd3698532dd9afa45b71a03930a5474
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Oct 20 19:25:26 2022 +0800

    [Feature][Connector-V2][S3] Add S3 file source & sink connector (#3119)
    
    * [Feature][Connector-V2][S3] Add S3 file source & sink connector
---
 docs/en/connector-v2/sink/S3File.md                | 204 +++++++++++++++++
 docs/en/connector-v2/source/S3File.md              | 243 +++++++++++++++++++++
 plugin-mapping.properties                          |   4 +-
 .../seatunnel/file/config/FileSystemType.java      |   3 +-
 .../connector-file/connector-file-s3/pom.xml       |  70 ++++++
 .../seatunnel/file/s3/config/S3Conf.java           |  47 ++++
 .../seatunnel/file/s3/config/S3Config.java}        |  22 +-
 .../seatunnel/file/s3/sink/S3FileSink.java         |  52 +++++
 .../seatunnel/file/s3/source/S3FileSource.java     |  95 ++++++++
 .../services/org.apache.hadoop.fs.FileSystem       |  16 ++
 seatunnel-connectors-v2/connector-file/pom.xml     |   1 +
 seatunnel-dist/pom.xml                             |  12 +
 12 files changed, 751 insertions(+), 18 deletions(-)

diff --git a/docs/en/connector-v2/sink/S3File.md b/docs/en/connector-v2/sink/S3File.md
new file mode 100644
index 000000000..4f0e3376f
--- /dev/null
+++ b/docs/en/connector-v2/sink/S3File.md
@@ -0,0 +1,204 @@
+# S3File
+
+> S3 file sink connector
+
+## Description
+
+Output data to aws s3 file system.
+
+> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies.
+> It's only support hadoop version **2.6.5+**.
+
+## Key features
+
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+By default, we use 2PC commit to ensure `exactly-once`
+
+- [ ] [schema projection](../../concept/connector-v2-features.md)
+- [x] file format
+    - [x] text
+    - [x] csv
+    - [x] parquet
+    - [x] orc
+    - [x] json
+
+## Options
+
+| name                             | type    | required | default value                                             |
+|----------------------------------|---------|----------|-----------------------------------------------------------|
+| path                             | string  | yes      | -                                                         |
+| bucket                           | string  | yes      | -                                                         |
+| access_key                       | string  | yes      | -                                                         |
+| access_secret                    | string  | yes      | -                                                         |
+| file_name_expression             | string  | no       | "${transactionId}"                                        |
+| file_format                      | string  | no       | "text"                                                    |
+| filename_time_format             | string  | no       | "yyyy.MM.dd"                                              |
+| field_delimiter                  | string  | no       | '\001'                                                    |
+| row_delimiter                    | string  | no       | "\n"                                                      |
+| partition_by                     | array   | no       | -                                                         |
+| partition_dir_expression         | string  | no       | "${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/"                |
+| is_partition_field_write_in_file | boolean | no       | false                                                     |
+| sink_columns                     | array   | no       | When this parameter is empty, all fields are sink columns |
+| is_enable_transaction            | boolean | no       | true                                                      |
+| common-options                   |         | no       | -                                                         |
+
+### path [string]
+
+The target dir path is required.
+
+### bucket [string]
+
+The bucket address of s3 file system, for example: `s3n://seatunnel-test`
+
+**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`**
+
+### access_key [string]
+
+The access key of s3 file system.
+
+### access_secret [string]
+
+The access secret of s3 file system.
+
+### file_name_expression [string]
+
+`file_name_expression` describes the file expression which will be created into the `path`. We can add the variable `${now}` or `${uuid}` in the `file_name_expression`, like `test_${uuid}_${now}`,
+`${now}` represents the current time, and its format can be defined by specifying the option `filename_time_format`.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+### file_format [string]
+
+We supported as the following file types:
+
+`text` `csv` `parquet` `orc` `json`
+
+Please note that, The final file name will end with the file_format's suffix, the suffix of the text file is `txt`.
+
+### filename_time_format [string]
+
+When the format in the `file_name_expression` parameter is `xxxx-${now}` , `filename_time_format` can specify the time format of the path, and the default value is `yyyy.MM.dd` . The commonly used time formats are listed as follows:
+
+| Symbol | Description        |
+|--------|--------------------|
+| y      | Year               |
+| M      | Month              |
+| d      | Day of month       |
+| H      | Hour in day (0-23) |
+| m      | Minute in hour     |
+| s      | Second in minute   |
+
+See [Java SimpleDateFormat](https://docs.oracle.com/javase/tutorial/i18n/format/simpleDateFormat.html) for detailed time format syntax.
+
+### field_delimiter [string]
+
+The separator between columns in a row of data. Only needed by `text` and `csv` file format.
+
+### row_delimiter [string]
+
+The separator between rows in a file. Only needed by `text` and `csv` file format.
+
+### partition_by [array]
+
+Partition data based on selected fields
+
+### partition_dir_expression [string]
+
+If the `partition_by` is specified, we will generate the corresponding partition directory based on the partition information, and the final file will be placed in the partition directory.
+
+Default `partition_dir_expression` is `${k0}=${v0}/${k1}=${v1}/.../${kn}=${vn}/`. `k0` is the first partition field and `v0` is the value of the first partition field.
+
+### is_partition_field_write_in_file [boolean]
+
+If `is_partition_field_write_in_file` is `true`, the partition field and the value of it will be written into data file.
+
+For example, if you want to write a Hive Data File, Its value should be `false`.
+
+### sink_columns [array]
+
+Which columns need be written to file, default value is all the columns get from `Transform` or `Source`.
+The order of the fields determines the order in which the file is actually written.
+
+### is_enable_transaction [boolean]
+
+If `is_enable_transaction` is true, we will ensure that data will not be lost or duplicated when it is written to the target directory.
+
+Please note that, If `is_enable_transaction` is `true`, we will auto add `${transactionId}_` in the head of the file.
+
+Only support `true` now.
+
+### common options
+
+Sink plugin common parameters, please refer to [Sink Common Options](common-options.md) for details.
+
+## Example
+
+For text file format
+
+```hocon
+
+  S3File {
+    access_key = "xxxxxxxxxxxxxxxxx"
+    secret_key = "xxxxxxxxxxxxxxxxx"
+    bucket = "s3n://seatunnel-test"
+    tmp_path = "/tmp/seatunnel"
+    path="/seatunnel/text"
+    row_delimiter="\n"
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="text"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
+
+```
+
+For parquet file format
+
+```hocon
+
+  S3File {
+    access_key = "xxxxxxxxxxxxxxxxx"
+    secret_key = "xxxxxxxxxxxxxxxxx"
+    bucket = "s3n://seatunnel-test"
+    tmp_path = "/tmp/seatunnel"
+    path="/seatunnel/parquet"
+    row_delimiter="\n"
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="parquet"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
+
+```
+
+For orc file format
+
+```hocon
+
+  S3File {
+    access_key = "xxxxxxxxxxxxxxxxx"
+    secret_key = "xxxxxxxxxxxxxxxxx"
+    bucket = "s3n://seatunnel-test"
+    tmp_path = "/tmp/seatunnel"
+    path="/seatunnel/orc"
+    row_delimiter="\n"
+    partition_dir_expression="${k0}=${v0}"
+    is_partition_field_write_in_file=true
+    file_name_expression="${transactionId}_${now}"
+    file_format="orc"
+    filename_time_format="yyyy.MM.dd"
+    is_enable_transaction=true
+  }
+
+```
+
+## Changelog
+
+| Version    | Date       | Pull Request                                                    | Subject      |
+|------------|------------|-----------------------------------------------------------------|--------------|
+| 2.2.0-beta | 2022-10-17 | [3119](https://github.com/apache/incubator-seatunnel/pull/3119) | First commit |
\ No newline at end of file
diff --git a/docs/en/connector-v2/source/S3File.md b/docs/en/connector-v2/source/S3File.md
new file mode 100644
index 000000000..3178726f2
--- /dev/null
+++ b/docs/en/connector-v2/source/S3File.md
@@ -0,0 +1,243 @@
+# S3File
+
+> S3 file source connector
+
+## Description
+
+Read data from aws s3 file system.
+
+> Tips: We made some trade-offs in order to support more file types, so we used the HDFS protocol for internal access to S3 and this connector need some hadoop dependencies.
+> It's only support hadoop version **2.6.5+**.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [x] [exactly-once](../../concept/connector-v2-features.md)
+
+Read all the data in a split in a pollNext call. What splits are read will be saved in snapshot.
+
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [x] file format
+  - [x] text
+  - [x] csv
+  - [x] parquet
+  - [x] orc
+  - [x] json
+
+## Options
+
+| name                      | type    | required | default value       |
+|---------------------------|---------|----------|---------------------|
+| path                      | string  | yes      | -                   |
+| type                      | string  | yes      | -                   |
+| bucket                    | string  | yes      | -                   |
+| access_key                | string  | yes      | -                   |
+| access_secret             | string  | yes      | -                   |
+| delimiter                 | string  | no       | \001                |
+| parse_partition_from_path | boolean | no       | true                |
+| date_format               | string  | no       | yyyy-MM-dd          |
+| datetime_format           | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format               | string  | no       | HH:mm:ss            |
+| schema                    | config  | no       | -                   |
+| common-options            |         | no       | -                   |
+
+### path [string]
+
+The source file path.
+
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### parse_partition_from_path [boolean]
+
+Control whether parse the partition keys and values from file path
+
+For example if you read a file from path `s3n://hadoop-cluster/tmp/seatunnel/parquet/name=tyrantlucifer/age=26`
+
+Every record data from file will be added these two fields:
+
+| name           | age |
+|----------------|-----|
+| tyrantlucifer  | 26  |
+
+Tips: **Do not define partition fields in schema option**
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
+### type [string]
+
+File type, supported as the following file types:
+
+`text` `csv` `parquet` `orc` `json`
+
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+You can also save multiple pieces of data in one file and split them by newline:
+
+```json lines
+
+{"code":  200, "data":  "get success", "success":  true}
+{"code":  300, "data":  "get failed", "success":  false}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
+
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
+
+### bucket [string]
+
+The bucket address of s3 file system, for example: `s3n://seatunnel-test`
+
+**Tips: SeaTunnel S3 file connector only support `s3n` protocol, not support `s3` and `s3a`**
+
+### access_key [string]
+
+The access key of s3 file system.
+
+### access_secret [string]
+
+The access secret of s3 file system.
+
+### schema [config]
+
+#### fields [Config]
+
+The schema of upstream data.
+
+### common options 
+
+Source plugin common parameters, please refer to [Source Common Options](common-options.md) for details.
+
+## Example
+
+```hocon
+
+  S3File {
+    path = "/seatunnel/text"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    secret_key = "xxxxxxxxxxxxxxxxx"
+    bucket = "s3n://seatunnel-test"
+    type = "text"
+  }
+
+```
+
+```hocon
+
+  S3File {
+    path = "/seatunnel/json"
+    bucket = "s3n://seatunnel-test"
+    access_key = "xxxxxxxxxxxxxxxxx"
+    access_secret = "xxxxxxxxxxxxxxxxxxxxxx"
+    type = "json"
+    schema {
+      fields {
+        id = int 
+        name = string
+      }
+    }
+  }
+
+```
+
+## Changelog
+
+| Version    | Date       | Pull Request                                                    | Subject      |
+|------------|------------|-----------------------------------------------------------------|--------------|
+| 2.2.0-beta | 2022-10-17 | [3119](https://github.com/apache/incubator-seatunnel/pull/3119) | First commit |
\ No newline at end of file
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 0b786891e..fa70c6ea7 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -130,4 +130,6 @@ seatunnel.sink.Sentry = connector-sentry
 seatunnel.source.MongoDB = connector-mongodb
 seatunnel.sink.MongoDB = connector-mongodb
 seatunnel.source.Iceberg = connector-iceberg
-seatunnel.source.influxdb = connector-influxdb
\ No newline at end of file
+seatunnel.source.InfluxDB = connector-influxdb
+seatunnel.source.S3File = connector-file-s3
+seatunnel.sink.S3File = connector-file-s3
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
index f1d271f36..9dbc34a5d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
@@ -23,7 +23,8 @@ public enum FileSystemType implements Serializable {
     HDFS("HdfsFile"),
     LOCAL("LocalFile"),
     OSS("OssFile"),
-    FTP("FtpFile");
+    FTP("FtpFile"),
+    S3("S3File");
 
     private final String fileSystemPluginName;
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml
new file mode 100644
index 000000000..d212bb99f
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/pom.xml
@@ -0,0 +1,70 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+    <parent>
+        <artifactId>connector-file</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>${revision}</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>connector-file-s3</artifactId>
+
+    <properties>
+        <hadoop-aws.version>2.6.5</hadoop-aws.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-file-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
+            <exclusions>
+                <exclusion>
+                    <artifactId>avro</artifactId>
+                    <groupId>org.apache.avro</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-aws</artifactId>
+            <version>${hadoop-aws.version}</version>
+            <exclusions>
+                <exclusion>
+                    <artifactId>jdk.tools</artifactId>
+                    <groupId>jdk.tools</groupId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
new file mode 100644
index 000000000..728ff14fa
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Conf.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.config;
+
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.util.HashMap;
+
+public class S3Conf extends HadoopConf {
+    private final String fsHdfsImpl = "org.apache.hadoop.fs.s3native.NativeS3FileSystem";
+
+    @Override
+    public String getFsHdfsImpl() {
+        return fsHdfsImpl;
+    }
+
+    private S3Conf(String hdfsNameKey) {
+        super(hdfsNameKey);
+    }
+
+    public static HadoopConf buildWithConfig(Config config) {
+        HadoopConf hadoopConf = new S3Conf(config.getString(S3Config.S3_BUCKET));
+        HashMap<String, String> s3Options = new HashMap<>();
+        s3Options.put("fs.s3n.awsAccessKeyId", config.getString(S3Config.S3_ACCESS_KEY));
+        s3Options.put("fs.s3n.awsSecretAccessKey", config.getString(S3Config.S3_SECRET_KEY));
+        s3Options.put("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem");
+        hadoopConf.setExtraOptions(s3Options);
+        return hadoopConf;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java
similarity index 62%
copy from seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
copy to seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java
index f1d271f36..f7a6b6116 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileSystemType.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/config/S3Config.java
@@ -15,23 +15,13 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.connectors.seatunnel.file.config;
+package org.apache.seatunnel.connectors.seatunnel.file.s3.config;
 
-import java.io.Serializable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 
-public enum FileSystemType implements Serializable {
-    HDFS("HdfsFile"),
-    LOCAL("LocalFile"),
-    OSS("OssFile"),
-    FTP("FtpFile");
+public class S3Config extends BaseSourceConfig {
+    public static final String S3_ACCESS_KEY = "access_key";
+    public static final String S3_SECRET_KEY = "secret_key";
+    public static final String S3_BUCKET = "bucket";
 
-    private final String fileSystemPluginName;
-
-    FileSystemType(String fileSystemPluginName) {
-        this.fileSystemPluginName = fileSystemPluginName;
-    }
-
-    public String getFileSystemPluginName() {
-        return fileSystemPluginName;
-    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
new file mode 100644
index 000000000..8db00f149
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.sink;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSink.class)
+public class S3FileSink extends BaseFileSink {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.S3.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        super.prepare(pluginConfig);
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                S3Config.FILE_PATH, S3Config.S3_BUCKET,
+                S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SINK, result.getMsg());
+        }
+        hadoopConf = S3Conf.buildWithConfig(pluginConfig);
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
new file mode 100644
index 000000000..d77c3d4e5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.s3.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
+import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
+import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+import java.io.IOException;
+
+@AutoService(SeaTunnelSource.class)
+public class S3FileSource extends BaseFileSource {
+    @Override
+    public String getPluginName() {
+        return FileSystemType.S3.getFileSystemPluginName();
+    }
+
+    @Override
+    public void prepare(Config pluginConfig) throws PrepareFailException {
+        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig,
+                S3Config.FILE_PATH, S3Config.FILE_TYPE, S3Config.S3_BUCKET,
+                S3Config.S3_ACCESS_KEY, S3Config.S3_SECRET_KEY);
+        if (!result.isSuccess()) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
+        }
+        readStrategy = ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
+        String path = pluginConfig.getString(S3Config.FILE_PATH);
+        hadoopConf = S3Conf.buildWithConfig(pluginConfig);
+        try {
+            filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
+        } catch (IOException e) {
+            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
+        }
+        // support user-defined schema
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE).toUpperCase());
+        // only json text csv type support user-defined schema now
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    SeaTunnelRowType userDefinedSchema = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
+        } else {
+            try {
+                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+            } catch (FilePluginException e) {
+                throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+            }
+        }
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
new file mode 100644
index 000000000..0a7ee15c5
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/resources/META-INF/services/org.apache.hadoop.fs.FileSystem
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+org.apache.hadoop.fs.s3native.NativeS3FileSystem
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/pom.xml b/seatunnel-connectors-v2/connector-file/pom.xml
index fff8e0cd9..60d73ab9d 100644
--- a/seatunnel-connectors-v2/connector-file/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/pom.xml
@@ -40,6 +40,7 @@
         <module>connector-file-oss</module>
         <module>connector-file-ftp</module>
         <module>connector-file-base-hadoop</module>
+        <module>connector-file-s3</module>
     </modules>
 
     <build>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index a0d0350d3..ef635378f 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -272,6 +272,18 @@
                     <version>${project.version}</version>
                     <scope>provided</scope>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-influxdb</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.seatunnel</groupId>
+                    <artifactId>connector-file-s3</artifactId>
+                    <version>${project.version}</version>
+                    <scope>provided</scope>
+                </dependency>
             </dependencies>
         </profile>
         <profile>