You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by zo...@apache.org on 2022/10/09 13:49:35 UTC
[incubator-seatunnel] branch dev updated: [Improve][connector][file] Support user-defined schema for reading text file (#2976)
This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 1c05ee0d7 [Improve][connector][file] Support user-defined schema for reading text file (#2976)
1c05ee0d7 is described below
commit 1c05ee0d7ee00f76c74ddfc17aebdfbd75aa733b
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Sun Oct 9 21:49:28 2022 +0800
[Improve][connector][file] Support user-defined schema for reading text file (#2976)
---
docs/en/connector-v2/source/FtpFile.md | 107 +++++++++++++++++----
docs/en/connector-v2/source/HdfsFile.md | 90 ++++++++++++++---
docs/en/connector-v2/source/LocalFile.md | 87 ++++++++++++++---
docs/en/connector-v2/source/OssFile.md | 95 +++++++++++++++---
.../file/hdfs/source/BaseHdfsFileSource.java | 26 ++++-
.../connector-file/connector-file-base/pom.xml | 6 ++
.../seatunnel/file/config/BaseSourceConfig.java | 4 +
.../file/source/reader/AbstractReadStrategy.java | 7 ++
.../seatunnel/file/source/reader/ReadStrategy.java | 4 +
.../file/source/reader/TextReadStrategy.java | 63 ++++++++++--
.../seatunnel/file/ftp/source/FtpFileSource.java | 25 +++--
.../file/local/source/LocalFileSource.java | 27 ++++--
.../seatunnel/file/oss/source/OssFileSource.java | 26 ++++-
13 files changed, 480 insertions(+), 87 deletions(-)
diff --git a/docs/en/connector-v2/source/FtpFile.md b/docs/en/connector-v2/source/FtpFile.md
index 07a7f9134..0513cc1c5 100644
--- a/docs/en/connector-v2/source/FtpFile.md
+++ b/docs/en/connector-v2/source/FtpFile.md
@@ -21,16 +21,20 @@ Read data from ftp file server.
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| host | string | yes | - |
-| port | int | yes | - |
-| user | string | yes | - |
-| password | string | yes | - |
-| path | string | yes | - |
-| type | string | yes | - |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-----------------|---------|----------|---------------------|
+| host | string | yes | - |
+| port | int | yes | - |
+| user | string | yes | - |
+| password | string | yes | - |
+| path | string | yes | - |
+| type | string | yes | - |
+| delimiter | string | no | \001 |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### host [string]
@@ -52,6 +56,40 @@ The target ftp password is required
The source file path.
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
+### schema [config]
+
+The schema information of upstream data.
+
### type [string]
File type, supported as the following file types:
@@ -90,19 +128,45 @@ connector will generate data as the following:
|------|-------------|---------|
| 200 | get success | true |
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
-Now connector will treat the upstream data as the following:
+For example, upstream data is the following:
-| lines |
-|-----------------------------------|
-| The content of every line in file |
+```text
-### schema [config]
+tyrantlucifer#26#male
-#### fields [Config]
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content |
+|------------------------|
+| tyrantlucifer#26#male |
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
-The schema information of upstream data.
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
### common options
@@ -113,12 +177,17 @@ Source plugin common parameters, please refer to [Source Common Options](common-
```hocon
FtpFile {
- path = "/tmp/seatunnel/sink/parquet"
+ path = "/tmp/seatunnel/sink/text"
host = "192.168.31.48"
port = 21
user = tyrantlucifer
password = tianchao
type = "text"
+ schema = {
+ name = string
+ age = int
+ }
+ delimiter = "#"
}
```
diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 25e3e5fc6..842d2ac5d 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -26,18 +26,52 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-| -------------- | ------ | -------- | ------------- |
-| path | string | yes | - |
-| type | string | yes | - |
-| fs.defaultFS | string | yes | - |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-----------------|--------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| fs.defaultFS | string | yes | - |
+| delimiter | string | no | \001 |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
The source file path.
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
### type [string]
File type, supported as the following file types:
@@ -87,13 +121,45 @@ connector will generate data as the following:
If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
-Now connector will treat the upstream data as the following:
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content |
+|------------------------|
+| tyrantlucifer#26#male |
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+connector will generate data as the following:
-| lines |
-|-----------------------------------|
-| The content of every line in file |
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
### fs.defaultFS [string]
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index d4a019d19..2aa8d280c 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -26,17 +26,51 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|--------------- |--------|----------|---------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| schema | config | no | - |
-| common-options | | no | - |
+| name | type | required | default value |
+|-----------------|--------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| delimiter | string | no | \001 |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
The source file path.
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
### type [string]
File type, supported as the following file types:
@@ -86,13 +120,44 @@ connector will generate data as the following:
If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
-Now connector will treat the upstream data as the following:
+tyrantlucifer#26#male
+
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content |
+|------------------------|
+| tyrantlucifer#26#male |
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+connector will generate data as the following:
-| lines |
-|-----------------------------------|
-| The content of every line in file |
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
### schema [config]
diff --git a/docs/en/connector-v2/source/OssFile.md b/docs/en/connector-v2/source/OssFile.md
index a71bd41ad..6a0aace48 100644
--- a/docs/en/connector-v2/source/OssFile.md
+++ b/docs/en/connector-v2/source/OssFile.md
@@ -29,21 +29,55 @@ Read all the data in a split in a pollNext call. What splits are read will be sa
## Options
-| name | type | required | default value |
-|---------------|--------|----------|---------------|
-| path | string | yes | - |
-| type | string | yes | - |
-| bucket | string | yes | - |
-| access_key | string | yes | - |
-| access_secret | string | yes | - |
-| endpoint | string | yes | - |
-| schema | config | no | - |
-| common-options| | no | - |
+| name | type | required | default value |
+|-----------------|--------|----------|---------------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| bucket | string | yes | - |
+| access_key | string | yes | - |
+| access_secret | string | yes | - |
+| endpoint | string | yes | - |
+| delimiter | string | no | \001 |
+| date_format | string | no | yyyy-MM-dd |
+| datetime_format | string | no | yyyy-MM-dd HH:mm:ss |
+| time_format | string | no | HH:mm:ss |
+| schema | config | no | - |
+| common-options | | no | - |
### path [string]
The source file path.
+### delimiter [string]
+
+Field delimiter, used to tell connector how to slice and dice fields when reading text files
+
+default `\001`, the same as hive's default delimiter
+
+### date_format [string]
+
+Date type format, used to tell connector how to convert string to date, supported as the following formats:
+
+`yyyy-MM-dd` `yyyy.MM.dd` `yyyy/MM/dd`
+
+default `yyyy-MM-dd`
+
+### datetime_format [string]
+
+Datetime type format, used to tell connector how to convert string to datetime, supported as the following formats:
+
+`yyyy-MM-dd HH:mm:ss` `yyyy.MM.dd HH:mm:ss` `yyyy/MM/dd HH:mm:ss` `yyyyMMddHHmmss`
+
+default `yyyy-MM-dd HH:mm:ss`
+
+### time_format [string]
+
+Time type format, used to tell connector how to convert string to time, supported as the following formats:
+
+`HH:mm:ss` `HH:mm:ss.SSS`
+
+default `HH:mm:ss`
+
### type [string]
File type, supported as the following file types:
@@ -93,13 +127,44 @@ connector will generate data as the following:
If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
-If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+If you assign file type to `text` `csv`, you can choose to specify the schema information or not.
+
+For example, upstream data is the following:
+
+```text
+
+tyrantlucifer#26#male
-Now connector will treat the upstream data as the following:
+```
+
+If you do not assign data schema connector will treat the upstream data as the following:
+
+| content |
+|------------------------|
+| tyrantlucifer#26#male |
+
+If you assign data schema, you should also assign the option `delimiter` too except CSV file type
+
+you should assign schema and delimiter as the following:
+
+```hocon
+
+delimiter = "#"
+schema {
+ fields {
+ name = string
+ age = int
+ gender = string
+ }
+}
+
+```
+
+connector will generate data as the following:
-| lines |
-|-----------------------------------|
-| The content of every line in file |
+| name | age | gender |
+|---------------|-----|--------|
+| tyrantlucifer | 26 | male |
### bucket [string]
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 329ac60f6..9a986d172 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
import org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
@@ -41,6 +42,7 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
readStrategy = ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE));
+ readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(HdfsSourceConfig.FILE_PATH);
hadoopConf = new HadoopConf(pluginConfig.getString(HdfsSourceConfig.DEFAULT_FS));
try {
@@ -49,12 +51,26 @@ public abstract class BaseHdfsFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
}
// support user-defined schema
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE).toUpperCase());
+ // only json text csv type support user-defined schema now
if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
- Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
- rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ rowType = SeaTunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ break;
+ case ORC:
+ case PARQUET:
+ throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+ }
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index abd8f9931..ef1635d94 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -63,6 +63,12 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-text</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-core-base</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index fcccba986..60617de72 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -21,4 +21,8 @@ public class BaseSourceConfig {
public static final String FILE_TYPE = "type";
public static final String FILE_PATH = "path";
public static final String SCHEMA = "schema";
+ public static final String DELIMITER = "delimiter";
+ public static final String DATE_FORMAT = "date_format";
+ public static final String DATETIME_FORMAT = "datetime_format";
+ public static final String TIME_FORMAT = "time_format";
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 3be5771d3..55213d219 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -26,6 +26,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
@@ -41,6 +43,7 @@ import java.util.List;
public abstract class AbstractReadStrategy implements ReadStrategy {
protected HadoopConf hadoopConf;
protected SeaTunnelRowType seaTunnelRowType;
+ protected Config pluginConfig;
@Override
public void init(HadoopConf conf) {
@@ -100,4 +103,8 @@ public abstract class AbstractReadStrategy implements ReadStrategy {
return fileNames;
}
+ @Override
+ public void setPluginConfig(Config pluginConfig) {
+ this.pluginConfig = pluginConfig;
+ }
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 4f512a7db..ee32bd182 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
import org.apache.hadoop.conf.Configuration;
import java.io.IOException;
@@ -41,4 +43,6 @@ public interface ReadStrategy extends Serializable {
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
+
+ void setPluginConfig(Config pluginConfig);
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 61e6ede77..3ae05b86a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -17,13 +17,19 @@
package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.table.type.BasicType;
-import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.utils.DateTimeUtils;
+import org.apache.seatunnel.common.utils.DateUtils;
+import org.apache.seatunnel.common.utils.TimeUtils;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.format.text.TextDeserializationSchema;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
@@ -35,8 +41,11 @@ import java.io.InputStreamReader;
import java.nio.charset.StandardCharsets;
public class TextReadStrategy extends AbstractReadStrategy {
-
- private static final String TEXT_FIELD_NAME = "lines";
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private String fieldDelimiter = String.valueOf('\001');
+ private DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
+ private DateTimeUtils.Formatter datetimeFormat = DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
+ private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
@Override
public void read(String path, Collector<SeaTunnelRow> output) throws IOException, FilePluginException {
@@ -44,13 +53,53 @@ public class TextReadStrategy extends AbstractReadStrategy {
FileSystem fs = FileSystem.get(conf);
Path filePath = new Path(path);
try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
- reader.lines().forEach(line -> output.collect(new SeaTunnelRow(new String[]{line})));
+ reader.lines().forEach(line -> {
+ try {
+ deserializationSchema.deserialize(line.getBytes(), output);
+ } catch (IOException e) {
+ String errorMsg = String.format("Deserialize this data [%s] error, please check the origin data", line);
+ throw new RuntimeException(errorMsg);
+ }
+ });
}
}
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) {
- return new SeaTunnelRowType(new String[]{TEXT_FIELD_NAME},
- new SeaTunnelDataType<?>[]{BasicType.STRING_TYPE});
+ SeaTunnelRowType simpleSeaTunnelType = SeaTunnelSchema.buildSimpleTextSchema();
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(simpleSeaTunnelType)
+ .delimiter(String.valueOf('\002'))
+ .build();
+ return simpleSeaTunnelType;
+ }
+
+ @Override
+ public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ if (pluginConfig.hasPath(BaseSourceConfig.DELIMITER)) {
+ fieldDelimiter = pluginConfig.getString(BaseSourceConfig.DELIMITER);
+ } else {
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(BaseSourceConfig.FILE_TYPE));
+ if (fileFormat == FileFormat.CSV) {
+ fieldDelimiter = ",";
+ }
+ }
+ if (pluginConfig.hasPath(BaseSourceConfig.DATE_FORMAT)) {
+ dateFormat = DateUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.DATE_FORMAT));
+ }
+ if (pluginConfig.hasPath(BaseSourceConfig.DATETIME_FORMAT)) {
+ datetimeFormat = DateTimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.DATETIME_FORMAT));
+ }
+ if (pluginConfig.hasPath(BaseSourceConfig.TIME_FORMAT)) {
+ timeFormat = TimeUtils.Formatter.parse(pluginConfig.getString(BaseSourceConfig.TIME_FORMAT));
+ }
+ deserializationSchema = TextDeserializationSchema.builder()
+ .seaTunnelRowType(seaTunnelRowType)
+ .delimiter(fieldDelimiter)
+ .dateFormatter(dateFormat)
+ .dateTimeFormatter(datetimeFormat)
+ .timeFormatter(timeFormat)
+ .build();
}
}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 4d7d3c8c1..fc6c4d420 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -57,6 +57,7 @@ public class FtpFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Ftp file source connector only support read [text, csv, json] files");
}
readStrategy = ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE));
+ readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(FtpConfig.FILE_PATH);
hadoopConf = FtpConf.buildWithConfig(pluginConfig);
try {
@@ -66,12 +67,24 @@ public class FtpFileSource extends BaseFileSource {
}
// support user-defined schema
// only json type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
- Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ rowType = SeaTunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ break;
+ case ORC:
+ case PARQUET:
+ throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+ }
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index c4879482a..8cfe9e0df 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -53,6 +53,7 @@ public class LocalFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
readStrategy = ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE));
+ readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(LocalSourceConfig.FILE_PATH);
hadoopConf = new LocalConf(CommonConfigurationKeys.FS_DEFAULT_NAME_DEFAULT);
try {
@@ -62,13 +63,25 @@ public class LocalFileSource extends BaseFileSource {
}
// support user-defined schema
FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE).toUpperCase());
- // only json type support user-defined schema now
- if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
- Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
- rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ // only json text csv type support user-defined schema now
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ rowType = SeaTunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ break;
+ case ORC:
+ case PARQUET:
+ throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+ }
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index 2008dcc1c..a2bd20c0e 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
@@ -53,6 +54,7 @@ public class OssFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, result.getMsg());
}
readStrategy = ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE));
+ readStrategy.setPluginConfig(pluginConfig);
String path = pluginConfig.getString(OssConfig.FILE_PATH);
hadoopConf = OssConf.buildWithConfig(pluginConfig);
try {
@@ -61,12 +63,26 @@ public class OssFileSource extends BaseFileSource {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
}
// support user-defined schema
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE).toUpperCase());
+ // only json text csv type support user-defined schema now
if (pluginConfig.hasPath(OssConfig.SCHEMA)) {
- Config schemaConfig = pluginConfig.getConfig(OssConfig.SCHEMA);
- rowType = SeaTunnelSchema
- .buildWithConfig(schemaConfig)
- .getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ switch (fileFormat) {
+ case CSV:
+ case TEXT:
+ case JSON:
+ Config schemaConfig = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ rowType = SeaTunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ break;
+ case ORC:
+ case PARQUET:
+ throw new UnsupportedOperationException("SeaTunnel does not support user-defined schema for [parquet, orc] files");
+ default:
+ // never got in there
+ throw new UnsupportedOperationException("SeaTunnel does not supported this file format");
+ }
} else {
try {
rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));