You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/09 13:49:35 UTC

[incubator-seatunnel] branch dev updated: [Improve][connector][file] Support user-defined schema for reading text file (#2976)

This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1c05ee0d7 [Improve][connector][file] Support user-defined schema for reading text file (#2976)
1c05ee0d7 is described below

commit 1c05ee0d7ee00f76c74ddfc17aebdfbd75aa733b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sun Oct 9 21:49:28 2022 +0800

    [Improve][connector][file] Support user-defined schema for reading text file (#2976)
---
 docs/en/connector-v2/source/FtpFile.md             | 107 +++++++++++++++++----
 docs/en/connector-v2/source/HdfsFile.md            |  90 ++++++++++++++---
 docs/en/connector-v2/source/LocalFile.md           |  87 ++++++++++++++---
 docs/en/connector-v2/source/OssFile.md             |  95 +++++++++++++++---
 .../file/hdfs/source/BaseHdfsFileSource.java       |  26 ++++-
 .../connector-file/connector-file-base/pom.xml     |   6 ++
 .../seatunnel/file/config/BaseSourceConfig.java    |   4 +
 .../file/source/reader/AbstractReadStrategy.java   |   7 ++
 .../seatunnel/file/source/reader/ReadStrategy.java |   4 +
 .../file/source/reader/TextReadStrategy.java       |  63 ++++++++++--
 .../seatunnel/file/ftp/source/FtpFileSource.java   |  25 +++--
 .../file/local/source/LocalFileSource.java         |  27 ++++--
 .../seatunnel/file/oss/source/OssFileSource.java   |  26 ++++-
 13 files changed, 480 insertions(+), 87 deletions(-)

diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 07a7f9134..0513cc1c5 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -21,16 +21,20 @@ Read data from ftp file server.
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| host           | string | yes      | -             |
-| port           | int    | yes      | -             |
-| user           | string | yes      | -             |
-| password       | string | yes      | -             |
-| path           | string | yes      | -             |
-| type           | string | yes      | -             |
-| schema         | config | no       | -             |
-| common-options |        | no       | -             |
+| name            | type    | required | default value       |
+|-----------------|---------|----------|---------------------|
+| host            | string  | yes      | -                   |
+| port            | int     | yes      | -                   |
+| user            | string  | yes      | -                   |
+| password        | string  | yes      | -                   |
+| path            | string  | yes      | -                   |
+| type            | string  | yes      | -                   |
+| delimiter       | string  | no       | \001                |
+| date_format     | string  | no       | yyyy-MM-dd          |
+| datetime_format | string  | no       | yyyy-MM-dd HH:mm:ss |
+| time_format     | string  | no       | HH:mm:ss            |
+| schema          | config  | no       | -                   |
+| common-options  |         | no       | -                   |
 
 ### host [string]
 
@@ -52,6 +56,40 @@ The target ftp password is required
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
+### schema [config]
+
+The schema information of upstream data.
+
 ### type [string]
 
 File type, supported as the following file types:
@@ -90,19 +128,45 @@ connector will generate data as the following:
 |------|-------------|---------|
 | 200  | get success | true    |
 
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
 
-Now connector will treat the upstream data as the following:
+For example, upstream data is the following:
 
-| lines                             |
-|-----------------------------------|
-| The content of every line in file |
+```text
 
-### schema [config]
+tyrantlucifer#26#male
 
-#### fields [Config]
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
 
-The schema information of upstream data.
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
 
 ### common options 
 
@@ -113,12 +177,17 @@ Source plugin common parameters, please refer to [Source Common Options](common-
 ```hocon
 
   FtpFile {
-    path = "/tmp/seatunnel/sink/parquet"
+    path = "/tmp/seatunnel/sink/text"
     host = "192.168.31.48"
     port = 21
     user = tyrantlucifer
     password = tianchao
     type = "text"
+    schema = {
+      name = string
+      age = int
+    }
+    delimiter = "#"
   }
 
 ```
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 25e3e5fc6..842d2ac5d 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -26,18 +26,52 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name           | type   | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| path           | string | yes      | -             |
-| type           | string | yes      | -             |
-| fs.defaultFS   | string | yes      | -             |
-| schema         | config | no       | -             |
-| common-options |        | no       | -             |
+| name            | type   | required | default value       |
+|-----------------|--------|----------|---------------------|
+| path            | string | yes      | -                   |
+| type            | string | yes      | -                   |
+| fs.defaultFS    | string | yes      | -                   |
+| delimiter       | string | no       | \001                |
+| date_format     | string | no       | yyyy-MM-dd          |
+| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
+| time_format     | string | no       | HH:mm:ss            |
+| schema          | config | no       | -                   |
+| common-options  |        | no       | -                   |
 
 ### path [string]
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
 ### type [string]
 
 File type, supported as the following file types:
@@ -87,13 +121,45 @@ connector will generate data as the following:
 
 If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
 
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
 
-Now connector will treat the upstream data as the following:
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
 
-| lines                             |
-|-----------------------------------|
-| The content of every line in file |
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
 
 ### fs.defaultFS [string]
 
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index d4a019d19..2aa8d280c 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -26,17 +26,51 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name           | type   | required | default value |
-|--------------- |--------|----------|---------------|
-| path           | string | yes      | -             |
-| type           | string | yes      | -             |
-| schema         | config | no       | -             |
-| common-options |        | no       | -             |
+| name            | type   | required | default value       |
+|-----------------|--------|----------|---------------------|
+| path            | string | yes      | -                   |
+| type            | string | yes      | -                   |
+| delimiter       | string | no       | \001                |
+| date_format     | string | no       | yyyy-MM-dd          |
+| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
+| time_format     | string | no       | HH:mm:ss            |
+| schema          | config | no       | -                   |
+| common-options  |        | no       | -                   |
 
 ### path [string]
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
 ### type [string]
 
 File type, supported as the following file types:
@@ -86,13 +120,44 @@ connector will generate data as the following:
 
 If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
 
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
 
-Now connector will treat the upstream data as the following:
+tyrantlucifer#26#male
+
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
 
-| lines                             |
-|-----------------------------------|
-| The content of every line in file |
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
 
 ### schema [config]
 
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index a71bd41ad..6a0aace48 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -29,21 +29,55 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
 
 ## Options
 
-| name          | type   | required | default value |
-|---------------|--------|----------|---------------|
-| path          | string | yes      | -             |
-| type          | string | yes      | -             |
-| bucket        | string | yes      | -             |
-| access_key    | string | yes      | -             |
-| access_secret | string | yes      | -             |
-| endpoint      | string | yes      | -             |
-| schema        | config | no       | -             |
-| common-options|        | no       | -             |
+| name            | type   | required | default value       |
+|-----------------|--------|----------|---------------------|
+| path            | string | yes      | -                   |
+| type            | string | yes      | -                   |
+| bucket          | string | yes      | -                   |
+| access_key      | string | yes      | -                   |
+| access_secret   | string | yes      | -                   |
+| endpoint        | string | yes      | -                   |
+| delimiter       | string | no       | \001                |
+| date_format     | string | no       | yyyy-MM-dd          |
+| datetime_format | string | no       | yyyy-MM-dd HH:mm:ss |
+| time_format     | string | no       | HH:mm:ss            |
+| schema          | config | no       | -                   |
+| common-options  |        | no       | -                   |
 
 ### path [string]
 
 The source file path.
 
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
 ### type [string]
 
 File type, supported as the following file types:
@@ -93,13 +127,44 @@ connector will generate data as the following:
 
 If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
 
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
 
-Now connector will treat the upstream data as the following:
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content                |
+|------------------------|
+| tyrantlucifer#26#male  | 
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+    fields {
+        name = string
+        age = int
+        gender = string 
+    }
+}
+
+```
+
+connector will generate data as the following:
 
-| lines                             |
-|-----------------------------------|
-| The content of every line in file |
+| name          | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26  | male   |
 
 ### bucket [string]
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 329ac60f6..9a986d172 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
@@ -41,6 +42,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
         readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH);
         hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS));
         try {
@@ -49,12 +51,26 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
         }
         // support user-defined schema
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE).toUpperCase());
+        // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
-            Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
-            rowType = SeaTunnelSchema
-                .buildWithConfig(schemaConfig)
-                .getSeaTunnelRowType();
-            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    rowType = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index abd8f9931..ef1635d94 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -63,6 +63,12 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-format-text</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
             <artifactId>seatunnel-core-base</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index fcccba986..60617de72 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -21,4 +21,8 @@ public class BaseSourceConfig {
     public static final String FILE_TYPE = "type";
     public static final String FILE_PATH = "path";
     public static final String SCHEMA = "schema";
+    public static final String DELIMITER = "delimiter";
+    public static final String DATE_FORMAT = "date_format";
+    public static final String DATETIME_FORMAT = "datetime_format";
+    public static final String TIME_FORMAT = "time_format";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 3be5771d3..55213d219 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -26,6 +26,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -41,6 +43,7 @@ import java.util.List;
 public abstract class AbstractReadStrategy implements ReadStrategy {
     protected HadoopConf hadoopConf;
     protected SeaTunnelRowType seaTunnelRowType;
+    protected Config pluginConfig;
 
     @Override
     public void init(HadoopConf conf) {
@@ -100,4 +103,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
         return fileNames;
     }
 
+    @Override
+    public void setPluginConfig(Config pluginConfig) {
+        this.pluginConfig = pluginConfig;
+    }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 4f512a7db..ee32bd182 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
 import org.apache.hadoop.conf.Configuration;
 
 import java.io.IOException;
@@ -41,4 +43,6 @@ public interface ReadStrategy extends Serializable {
     void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
 
     List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
+
+    void setPluginConfig(Config pluginConfig);
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 61e6ede77..3ae05b86a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -17,13 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +41,11 @@ import java.io.InputStreamReader;
 import java.nio.charset.StandardCharsets;
 
 public class TextReadStrategy extends AbstractReadStrategy {
-
-    private static final String TEXT_FIELD_NAME = "lines";
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+    private String fieldDelimiter = String.valueOf('\001');
+    private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+    private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+    private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
     public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
@@ -44,13 +53,53 @@ public class TextReadStrategy extends AbstractReadStrategy {
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
         try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
-            reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+                    throw new RuntimeException(errorMsg);
+                }
+            });
         }
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
-        return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
-                new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+        SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(simpleSeaTunnelType)
+                .delimiter(String.valueOf('\002'))
+                .build();
+        return simpleSeaTunnelType;
+    }
+
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER)) {
+            fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER);
+        } else {
+            FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE));
+            if (fileFormat == FileFormat.CSV) {
+                fieldDelimiter = ",";
+            }
+        }
+        if (pluginConfig.hasPath(BaseSourceConfig.DATE_FORMAT)) {
+            dateFormat = DateUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.DATE_FORMAT));
+        }
+        if (pluginConfig.hasPath(BaseSourceConfig.DATETIME_FORMAT)) {
+            datetimeFormat = DateTimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.DATETIME_FORMAT));
+        }
+        if (pluginConfig.hasPath(BaseSourceConfig.TIME_FORMAT)) {
+            timeFormat = TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.TIME_FORMAT));
+        }
+        deserializationSchema = TextDeserializationSchema.builder()
+                .seaTunnelRowType(seaTunnelRowType)
+                .delimiter(fieldDelimiter)
+                .dateFormatter(dateFormat)
+                .dateTimeFormatter(datetimeFormat)
+                .timeFormatter(timeFormat)
+                .build();
     }
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 4d7d3c8c1..fc6c4d420 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -57,6 +57,7 @@ public class FtpFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Ftp file source connector only support read [text, csv, json] files");
         }
         readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(FtpConfig.FILE_PATH);
         hadoopConf = FtpConf.buildWithConfig(pluginConfig);
         try {
@@ -66,12 +67,24 @@ public class FtpFileSource extends BaseFileSource {
         }
         // support user-defined schema
         // only json type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
-            Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-            rowType = SeaTunnelSchema
-                    .buildWithConfig(schemaConfig)
-                    .getSeaTunnelRowType();
-            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    rowType = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index c4879482a..8cfe9e0df 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -53,6 +53,7 @@ public class LocalFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
         readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH);
         hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
         try {
@@ -62,13 +63,25 @@ public class LocalFileSource extends BaseFileSource {
         }
         // support user-defined schema
         FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE).toUpperCase());
-        // only json type support user-defined schema now
-        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
-            Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
-            rowType = SeaTunnelSchema
-                    .buildWithConfig(schemaConfig)
-                    .getSeaTunnelRowType();
-            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+        // only json text csv type support user-defined schema now
+        if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    rowType = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 2008dcc1c..a2bd20c0e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
@@ -53,6 +54,7 @@ public class OssFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
         }
         readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE));
+        readStrategy.setPluginConfig(pluginConfig);
         String path = pluginConfig.getString(OssConfig.FILE_PATH);
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
         try {
@@ -61,12 +63,26 @@ public class OssFileSource extends BaseFileSource {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
         }
         // support user-defined schema
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE).toUpperCase());
+        // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(OssConfig.SCHEMA)) {
-            Config schemaConfig = pluginConfig.getConfig(OssConfig.SCHEMA);
-            rowType = SeaTunnelSchema
-                    .buildWithConfig(schemaConfig)
-                    .getSeaTunnelRowType();
-            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+            switch (fileFormat) {
+                case CSV:
+                case TEXT:
+                case JSON:
+                    Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+                    rowType = SeaTunnelSchema
+                            .buildWithConfig(schemaConfig)
+                            .getSeaTunnelRowType();
+                    readStrategy.setSeaTunnelRowTypeInfo(rowType);
+                    break;
+                case ORC:
+                case PARQUET:
+                    throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+                default:
+                    // never got in there
+                    throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+            }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));