You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@seatunnel.apache.org by ki...@apache.org on 2022/08/18 02:03:58 UTC

[incubator-seatunnel] branch dev updated: [Feature][Connector-V2] Add hdfs file json support (#2451)

This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 84f6b17c1 [Feature][Connector-V2] Add hdfs file json support (#2451)
84f6b17c1 is described below

commit 84f6b17c15ac3f82c010322ce2b74ee74b501ac9
Author: TyrantLucifer <Ty...@gmail.com>
AuthorDate: Thu Aug 18 10:03:53 2022 +0800

    [Feature][Connector-V2] Add hdfs file json support (#2451)
    
    * [Feature][Connector-V2] Support json format for hdfs source
    
    * [Feature][Connector-V2] Add json read strategy
    
    * [Feature][Connector-V2] Update hdfs file source doc
---
 docs/en/connector-v2/source/HdfsFile.md            | 71 ++++++++++++++++++++--
 .../connector-file/connector-file-base/pom.xml     |  6 ++
 .../seatunnel/file/config/BaseSourceConfig.java    |  1 +
 .../seatunnel/file/config/FileFormat.java          |  3 +-
 .../file/source/reader/AbstractReadStrategy.java   |  9 ++-
 .../file/source/reader/JsonReadStrategy.java       | 66 ++++++++++++++++++++
 .../seatunnel/file/source/reader/ReadStrategy.java |  2 +
 .../seatunnel/file/hdfs/source/HdfsFileSource.java | 18 ++++--
 8 files changed, 164 insertions(+), 12 deletions(-)

diff --git a/docs/en/connector-v2/source/HdfsFile.md b/docs/en/connector-v2/source/HdfsFile.md
index 6f8d4df83..00bbe5fdd 100644
--- a/docs/en/connector-v2/source/HdfsFile.md
+++ b/docs/en/connector-v2/source/HdfsFile.md
@@ -8,11 +8,12 @@ Read data from hdfs file system.
 
 ## Options
 
-| name         | type   | required | default value |
-|--------------| ------ |----------|---------------|
-| path         | string | yes      | -             |
-| type         | string | yes      | -             |
-| fs.defaultFS | string | yes      | -             |
+| name          | type   | required | default value |
+|---------------|--------|----------|---------------|
+| path          | string | yes      | -             |
+| type          | string | yes      | -             |
+| fs.defaultFS  | string | yes      | -             |
+| schema        | config | no       | -             |
 
 ### path [string]
 
@@ -24,13 +25,55 @@ File type, supported as the following file types:
 
 `text` `csv` `parquet` `orc` `json`
 
+If you assign file type to `json`, you should also assign schema option to tell connector how to parse data to the row you want.
+
+For example:
+
+upstream data is the following:
+
+```json
+
+{"code":  200, "data":  "get success", "success":  true}
+
+```
+
+you should assign schema as the following:
+
+```hocon
+
+schema {
+    fields {
+        code = int
+        data = string
+        success = boolean
+    }
+}
+
+```
+
+connector will generate data as the following:
+
+| code | data        | success |
+|------|-------------|---------|
+| 200  | get success | true    |
+
+If you assign file type to `parquet` `orc`, schema option not required, connector can find the schema of upstream data automatically.
+
+If you assign file type to `text` `csv`, schema option not supported temporarily, but the subsequent features will support.
+
+Now connector will treat the upstream data as the following:
+
+| lines                             |
+|-----------------------------------|
+| The content of every line in file |
+
 ### fs.defaultFS [string]
 
 Hdfs cluster address.
 
 ## Example
 
-```hcon
+```hocon
 
 HdfsFile {
   path = "/apps/hive/demo/student"
@@ -38,4 +81,20 @@ HdfsFile {
   fs.defaultFS = "hdfs://namenode001"
 }
 
+```
+
+```hocon
+
+HdfsFile {
+  schema {
+    fields {
+      name = string
+      age = int
+    }
+  }
+  path = "/apps/hive/demo/student"
+  type = "json"
+  fs.defaultFS = "hdfs://namenode001"
+}
+
 ```
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
index 13173c14e..03fc5a56d 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/pom.xml
@@ -50,6 +50,12 @@
             <scope>test</scope>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>connector-common</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>org.apache.parquet</groupId>
             <artifactId>parquet-avro</artifactId>
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
index 16f1bada9..fcccba986 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseSourceConfig.java
@@ -20,4 +20,5 @@ package org.apache.seatunnel.connectors.seatunnel.file.config;
 public class BaseSourceConfig {
     public static final String FILE_TYPE = "type";
     public static final String FILE_PATH = "path";
+    public static final String SCHEMA = "schema";
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
index 790e7ca64..a356ed0c6 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/FileFormat.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
+import org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
 import org.apache.seatunnel.connectors.seatunnel.file.source.reader.OrcReadStrategy;
 import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 import org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
@@ -52,7 +53,7 @@ public enum FileFormat implements Serializable {
     JSON("json") {
         @Override
         public ReadStrategy getReadStrategy() {
-            return new TextReadStrategy();
+            return new JsonReadStrategy();
         }
     };
 
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index eba6d4b21..215c4667c 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
@@ -33,13 +34,19 @@ import java.util.List;
 
 @Slf4j
 public abstract class AbstractReadStrategy implements ReadStrategy {
-    HadoopConf hadoopConf;
+    protected HadoopConf hadoopConf;
+    protected SeaTunnelRowType seaTunnelRowType;
 
     @Override
     public void init(HadoopConf conf) {
         this.hadoopConf = conf;
     }
 
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        this.seaTunnelRowType = seaTunnelRowType;
+    }
+
     @Override
     public Configuration getConfiguration(HadoopConf hadoopConf) {
         Configuration configuration = new Configuration();
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
new file mode 100644
index 000000000..9e5775fb3
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
+
+public class JsonReadStrategy extends AbstractReadStrategy {
+    private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+    @Override
+    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        deserializationSchema = new JsonDeserializationSchema(false, false, this.seaTunnelRowType);
+    }
+
+    @Override
+    public void read(String path, Collector<SeaTunnelRow> output) throws Exception {
+        Configuration conf = getConfiguration();
+        FileSystem fs = FileSystem.get(conf);
+        Path filePath = new Path(path);
+        try (BufferedReader reader = new BufferedReader(new InputStreamReader(fs.open(filePath), StandardCharsets.UTF_8))) {
+            reader.lines().forEach(line -> {
+                try {
+                    deserializationSchema.deserialize(line.getBytes(), output);
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+            });
+        }
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException {
+        return this.seaTunnelRowType;
+    }
+}
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 1114eab65..4f512a7db 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -38,5 +38,7 @@ public interface ReadStrategy extends Serializable {
 
     SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String path) throws FilePluginException;
 
+    void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+
     List<String> getFileNamesByPath(HadoopConf hadoopConf, String path) throws IOException;
 }
diff --git a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
index 44cddd879..f0474a2c1 100644
--- a/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
+++ b/seatunnel-connectors-v2/connector-file/connector-file-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/HdfsFileSource.java
@@ -22,6 +22,7 @@ import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeatunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
@@ -57,10 +58,19 @@ public class HdfsFileSource extends BaseFileSource {
         } catch (IOException e) {
             throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Check file path fail.");
         }
-        try {
-            rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
-        } catch (FilePluginException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+        // support user-defined schema
+        if (pluginConfig.hasPath(HdfsSourceConfig.SCHEMA)) {
+            Config schemaConfig = pluginConfig.getConfig(HdfsSourceConfig.SCHEMA);
+            rowType = SeatunnelSchema
+                    .buildWithConfig(schemaConfig)
+                    .getSeaTunnelRowType();
+            readStrategy.setSeaTunnelRowTypeInfo(rowType);
+        } else {
+            try {
+                rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, filePaths.get(0));
+            } catch (FilePluginException e) {
+                throw new PrepareFailException(getPluginName(), PluginType.SOURCE, "Read file schema error.", e);
+            }
         }
     }
 }