You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/19 09:37:21 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Local file json support (#2465)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 65a92f249 [Feature][Connector-V2] Local file json support (#2465)
65a92f249 is described below

commit 65a92f24965ac17d1adec3dc475206ede5dcd7b4
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Fri Aug 19 17:37:15 2022 +0800

    [Feature][Connector-V2] Local file json support (#2465)
    
    * [Feature][Connector-V2] Local file source add json support
    
    * [Feature][Connector-V2] Update local file source doc
    
    * [Feature][Connector-V2] Optimize schema config and add check logic for user-defined schema
---
 docs/en/connector-v2/source/LocalFile.md           | 72 ++++++++++++++++++++--
 .../seatunnel/common/schema/SeatunnelSchema.java   |  1 +
 .../file/local/source/LocalFileSource.java         | 21 +++++--
 3 files changed, 85 insertions(+), 9 deletions(-)

diff --git a/docs/en/connector-v2/source/LocalFile.md b/docs/en/connector-v2/source/LocalFile.md
index f725186fc..e6ac6142f 100644
--- a/docs/en/connector-v2/source/LocalFile.md
+++ b/docs/en/connector-v2/source/LocalFile.md
@@ -8,10 +8,11 @@ Read data from local file system.
 
 ## Options
 
-| name         | type   | required | default value |
-|--------------| ------ |----------|---------------|
-| path         | string | yes      | -             |
-| type         | string | yes      | -             |
+| name   | type   | required | default value |
+|--------|--------|----------|---------------|
+| path   | string | yes      | -             |
+| type   | string | yes      | -             |
+| schema | config | no       | -             |
 
 ### path [string]
 
@@ -23,13 +24,74 @@ File type, supported as the following file types:
 
 `text` `csv` `parquet` `orc` `json`
 
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+
+Now connector will treat the upstream data as the following:
+
+| lines                             |
+|-----------------------------------|
+| The content of every line in file |
+
+### schema [config]
+
+The schema information of upstream data.
+
 ## Example
 
-```hcon
+```hocon
 
 LocalFile {
   path = "/apps/hive/demo/student"
   type = "parquet"
 }
 
+```
+
+```hocon
+
+LocalFile {
+  schema {
+    fields {
+      name = string
+      age = int
+    }
+  }
+  path = "/apps/hive/demo/student"
+  type = "json"
+}
+
 ```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
index 57beaa8de..4c4799b21 100644
--- a/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
+++ b/seatunnel-connectors-v2/connector-common/src/main/java/org/apache/seatunnel/connectors/seatunnel/common/schema/SeatunnelSchema.java
@@ -35,6 +35,7 @@ import org.apache.seatunnel.shade.com.typesafe.config.ConfigRenderOptions;
 import java.util.Map;
 
 public class SeatunnelSchema {
+    public static final String SCHEMA = "schema";
     private static final String FIELD_KEY = "fields";
     private static final String SIMPLE_SCHEMA_FILED = "content";
     private final SeaTunnelRowType seaTunnelRowType;
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 58d2083b9..35953113a 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
+import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 import org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
@@ -56,10 +58,21 @@ public class LocalFileSource extends BaseFileSource {
         } catch (IOException e) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
         }
-        try {
-            rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
-        } catch (FilePluginException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+        // support user-defined schema
+        FileFormat fileFormat = FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_PATH).toUpperCase());
+        // only json type support user-defined schema now
+        if (pluginConfig.hasPath(SeatunnelSchema.SCHEMA) && fileFormat.equals(FileFormat.JSON)) {
+            Config schemaConfig = pluginConfig.getConfig(SeatunnelSchema.SCHEMA);
+            rowType = SeatunnelSchema
+                    .buildWithConfig(schemaConfig)
+                    .getSeaTunnelRowType();
+            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+        } else {
+            try {
+                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+            } catch (FilePluginException e) {
+                throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+            }
         }
     }
 }