You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/19 09:37:21 UTC
[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Local file json support (#2465)
This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 65a92f249 [Feature][Connector-V2] Local file json support (#2465)
65a92f249 is described below
commit 65a92f24965ac17d1adec3dc475206ede5dcd7b4
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Aug 19 17:37:15 2022 +0800
[Feature][Connector-V2] Local file json support (#2465)
* [Feature][Connector-V2] Local file source add json support
* [Feature][Connector-V2] Update local file source doc
* [Feature][Connector-V2] Optimize schema config and add check logic for user-defined schema
---
docs/en/connector-v2/source/LocalFile.md | 72 ++++++++++++++++++++--
.../seatunnel/common/schema/SeatunnelSchema.java | 1 +
.../file/local/source/LocalFileSource.java | 21 +++++--
3 files changed, 85 insertions(+), 9 deletions(-)
diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index f725186fc..e6ac6142f 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -8,10 +8,11 @@ Read data from local file system.
## Options
-| name | type | required | default value |
-|--------------| ------ |----------|---------------|
-| path | string | yes | - |
-| type | string | yes | - |
+| name | type | required | default value |
+|--------|--------|----------|---------------|
+| path | string | yes | - |
+| type | string | yes | - |
+| schema | config | no | - |
### path [string]
@@ -23,13 +24,74 @@ File type, supported as the following file types:
`text` `csv` `parquet` `orc` `json`
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code": 200, "data": "get success", "success": true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+ fields {
+ code = int
+ data = string
+ success = boolean
+ }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data | success |
+|------|-------------|---------|
+| 200 | get success | true |
+
+If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+
+Now connector will treat the upstream data as the following:
+
+| lines |
+|-----------------------------------|
+| The content of every line in file |
+
+### schema [config]
+
+The schema information of upstream data.
+
## Example
-```hcon
+```hocon
LocalFile {
path = "/apps/hive/demo/student"
type = "parquet"
}
+```
+
+```hocon
+
+LocalFile {
+ schema {
+ fields {
+ name = string
+ age = int
+ }
+ }
+ path = "/apps/hive/demo/student"
+ type = "json"
+}
+
```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 57beaa8de..4c4799b21 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
import java.util.Map;
public class SeatunnelSchema {
+ public static final String SCHEMA = "schema";
private static final String FIELD_KEY = "fields";
private static final String SIMPLE_SCHEMA_FILED = "content";
private final SeaTunnelRowType seaTunnelRowType;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 58d2083b9..35953113a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
@@ -56,10 +58,21 @@ public class LocalFileSource extends BaseFileSource {
} catch (IOException e) {
throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
}
- try {
- rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
- } catch (FilePluginException e) {
- throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+ // support user-defined schema
+ FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_PATH).toUpperCase());
+ // only json type support user-defined schema now
+ if (pluginConfig.hasPath(SeatunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
+ Config schemaConfig = pluginConfig.getConfig(SeatunnelSchema.SCHEMA);
+ rowType = SeatunnelSchema
+ .buildWithConfig(schemaConfig)
+ .getSeaTunnelRowType();
+ readStrategy.setSeaTunnelRowTypeInfo(rowType);
+ } else {
+ try {
+ rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+ } catch (FilePluginException e) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+ }
}
}
}